c 客户端

Eclipse Paho C

1.支持 TCP/SSL Socket 连接
2.支持自动重连
3.支持 WebSocket
4.代码样例

#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "MQTTClient.h"

#define ADDRESS     "tcp://localhost:1883"
#define CLIENTID    "ExampleClientPub"
#define TOPIC       "MQTT Examples"
#define PAYLOAD     "Hello World!"
#define QOS         1
#define TIMEOUT     10000L

int main(int argc, char* argv[])
{
    MQTTClient client;
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    MQTTClient_message pubmsg = MQTTClient_message_initializer;
    MQTTClient_deliveryToken token;
    int rc;

    MQTTClient_create(&client, ADDRESS, CLIENTID,
        MQTTCLIENT_PERSISTENCE_NONE, NULL);
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;

    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to connect, return code %d\n", rc);
        exit(-1);
    }
    pubmsg.payload = PAYLOAD;
    pubmsg.payloadlen = strlen(PAYLOAD);
    pubmsg.qos = QOS;
    pubmsg.retained = 0;
    MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
    printf("Waiting for up to %d seconds for publication of %s\n"
            "on topic %s for client with ClientID: %s\n",
            (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
    rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
    printf("Message with delivery token %d delivered\n", token);
    MQTTClient_disconnect(client, 10000);
    MQTTClient_destroy(&client);
    return rc;
}
mqttc

1.支持消息发布、主题订阅以及取消订阅
2.代码样例

mqttc -h host -p port -u username -P password -k keepalive
Eclipse Paho Embedded C

1.支持 MQTT V3.1 以及 V3.1.1 协议
2.支持 TCP/SSL Socket 连接
3.代码样例

#define MQTTCLIENT_QOS2 1
#include "MQTTClient.h"
#define DEFAULT_STACK_SIZE -1
#include "linux.cpp"

int arrivedcount = 0;

void messageArrived(MQTT::MessageData& md)
{
    MQTT::Message &message = md.message;

    printf("Message %d arrived: qos %d, retained %d, dup %d, packetid %d\n",
        ++arrivedcount, message.qos, message.retained, message.dup, message.id);
    printf("Payload %.*s\n", (int)message.payloadlen, (char*)message.payload);
}

int main(int argc, char* argv[])
{
    IPStack ipstack = IPStack();
    float version = 0.3;
    const char* topic = "mbed-sample";

    printf("Version is %f\n", version);

    MQTT::Client client = MQTT::Client(ipstack);

    const char* hostname = "iot.eclipse.org";
    int port = 1883;
    printf("Connecting to %s:%d\n", hostname, port);
    int rc = ipstack.connect(hostname, port);
    if (rc != 0)
        printf("rc from TCP connect is %d\n", rc);

    printf("MQTT connecting\n");
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    data.MQTTVersion = 3;
    data.clientID.cstring = (char*)"mbed-icraggs";
    rc = client.connect(data);
    if (rc != 0)
        printf("rc from MQTT connect is %d\n", rc);
    printf("MQTT connected\n");

    rc = client.subscribe(topic, MQTT::QOS2, messageArrived);
    if (rc != 0)
        printf("rc from MQTT subscribe is %d\n", rc);

    MQTT::Message message;

    // QoS 0
    char buf[100];
    sprintf(buf, "Hello World!  QoS 0 message from app version %f", version);
    message.qos = MQTT::QOS0;
    message.retained = false;
    message.dup = false;
    message.payload = (void*)buf;
    message.payloadlen = strlen(buf)+1;
    rc = client.publish(topic, message);
    if (rc != 0)
        printf("Error %d from sending QoS 0 message\n", rc);
    else while (arrivedcount == 0)
        client.yield(100);

    rc = client.unsubscribe(topic);
    if (rc != 0)
        printf("rc from unsubscribe was %d\n", rc);

    rc = client.disconnect();
    if (rc != 0)
        printf("rc from disconnect was %d\n", rc);

    ipstack.disconnect();

    return 0;
}
libmosquitto

libmosquitto 是 C 语言共享库,可以创建 MQTT 客户端程序。所有的 API 函数都有 mosquitto_ 前缀。


// 获取库的版本信息,返回到三个参数中
int mosquitto_lib_version(int *major,int *minor,int *revision)

// 初始化和清除
int mosquitto_lib_init();    
int mosquitto_lib_cleanup(); 

// 新建一个 mosquitto 客户端对象,并返回 struct mosquitto
struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *userdata);    

// 释放一个 mosquitto 客户端对象
void mosquitto_destroy(struct mosquitto *mosq);  
libemqtt

1.Embedded C client library for the MQTT protocol. It also provides a binding for Python.
2.编译

// C Library
$ make

// Python binding
$ make python
wolfMQTT

This is an implementation of the MQTT Client written in C for embedded use, which supports SSL/TLS via the wolfSSL library. This library was built from the ground up to be multi-platform, space conscience and extensible. Integrates with wolfSSL to provide TLS support.

Example code

// This is where the top level application interfaces for the MQTT client reside.
int MqttClient_Init(MqttClient *client, MqttNet *net, MqttMsgCb msg_cb, byte *tx_buf, int tx_buf_len, byte *rx_buf, int rx_buf_len, int cmd_timeout_ms);

// These API's are blocking on MqttNet.read until error/timeout (cmd_timeout_ms):
int MqttClient_Connect(MqttClient *client, MqttConnect *connect);
int MqttClient_Publish(MqttClient *client, MqttPublish *publish);
int MqttClient_Subscribe(MqttClient *client, MqttSubscribe *subscribe);
int MqttClient_Unsubscribe(MqttClient *client, MqttUnsubscribe *unsubscribe);
int MqttClient_Ping(MqttClient *client);
int MqttClient_Disconnect(MqttClient *client);

// This function blocks waiting for a new publish message to arrive for a maximum duration of timeout_ms.
int MqttClient_WaitMessage(MqttClient *client, MqttMessage *message, int timeout_ms);

// These are the network connect / disconnect interfaces that wrap the MqttNet callbacks and handle WolfSSL TLS:
int MqttClient_NetConnect(MqttClient *client, const char* host, word16 port, int timeout_ms, int use_tls, MqttTlsCb cb);
int MqttClient_NetDisconnect(MqttClient *client);

// Helper functions:
const char* MqttClient_ReturnCodeToString(int return_code);

C++ 客户端

Eclipse Paho C++

MQTT 3.1, MQTT 3.1.1
LWT
Standard TCP Support
Non-Blocking API
Blocking API
Message Persistence

Example code

int main(int argc, char* argv[])
{
    sample_mem_persistence persist;
    mqtt::client client(ADDRESS, CLIENTID, &persist);

    callback cb;
    client.set_callback(cb);

    mqtt::connect_options connOpts;
    connOpts.set_keep_alive_interval(20);
    connOpts.set_clean_session(true);

    try {
        std::cout << "Connecting..." << std::flush;
        client.connect(connOpts);
        std::cout << "OK" << std::endl;

        // First use a message pointer.

        std::cout << "Sending message..." << std::flush;
        mqtt::message_ptr pubmsg = std::make_shared(PAYLOAD1);
        pubmsg->set_qos(QOS);
        client.publish(TOPIC, pubmsg);
        std::cout << "OK" << std::endl;

        // Now try with itemized publish.

        std::cout << "Sending next message..." << std::flush;
        client.publish(TOPIC, PAYLOAD2, strlen(PAYLOAD2)+1, 0, false);
        std::cout << "OK" << std::endl;

        // Now try with a listener, but no token

        std::cout << "Sending final message..." << std::flush;
        pubmsg = std::make_shared(PAYLOAD3);
        pubmsg->set_qos(QOS);
        client.publish(TOPIC, pubmsg);
        std::cout << "OK" << std::endl;

        // Disconnect
        std::cout << "Disconnecting..." << std::flush;
        client.disconnect();
        std::cout << "OK" << std::endl;
    }
    catch (const mqtt::persistence_exception& exc) {
        std::cerr << "Persistence Error: " << exc.what() << " ["
            << exc.get_reason_code() << "]" << std::endl;
        return 1;
    }
    catch (const mqtt::exception& exc) {
        std::cerr << "Error: " << exc.what() << " ["
            << exc.get_reason_code() << "]" << std::endl;
        return 1;
    }

    return 0;
}
Eclipse Paho Embedded C++

MQTT 3.1, MQTT 3.1.1
LWT
SSL / TLS
Blocking API
High Availability

erlang 客户端

emqttc

1.异步的 Erlang MQTT 客户端,支持 MQTT V3.1 以及 V3.1.1 协议
2.支持 TCP/SSL Socket 连接
3.支持自动重连
4.支持 Keepalive, ping/pong
5.代码样例

%% connect to broker
{ok, C} = emqttc:start_link([{host, "localhost"}, {client_id, <<"simpleClient">>}]),

%% subscribe
emqttc:subscribe(C, <<"TopicA">>, qos0),

%% publish
emqttc:publish(C, <<"TopicA">>, <<"Payload...">>),

%% receive message
receive
    {publish, Topic, Payload} ->
        io:format("Message Received from ~s: ~p~n", [Topic, Payload])
after
    1000 ->
        io:format("Error: receive timeout!~n")
end,

%% disconnect from broker
emqttc:disconnect(C).
erlmqtt

An Erlang client library for MQTT 3.1 (but not for MQTT 3.1.1, yet).

Opening a connection and subscribing

1> {ok, C} = erlmqtt:open_clean("localhost", []).
{ok,<0.35.0>}
2> {ok, _} = erlmqtt:subscribe(C, [{"topic/#", at_most_once}]).
{ok,[at_most_once]}

Receiving messages

5> erlmqtt:recv_message(1000).
{<<"topic/a/b">>,<<"payload">>}
6> erlmqtt:recv_message(1000).
timeout
7> erlmqtt:publish(C, "topic/foo", <<"payload">>).
8> erlmqtt:poll_message().
{<<"topic/foo">>,<<"payload">>}
9> erlmqtt:poll_message().
none

java 客户端

Eclipse Paho Java

MQTT 3.1, MQTT 3.1.1
LWT
SSL / TLS
Message Persistence
Automatic Reconnect
Offline Buffering
Will buffer messages whilst offline to send when the connection is re-established.
WebSocket Support
Standard TCP Support
Non-Blocking API
Blocking API
High Availability

Example code

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttPublishSample {

    public static void main(String[] args) {

        String topic        = "MQTT Examples";
        String content      = "Message from MqttPublishSample";
        int qos             = 2;
        String broker       = "tcp://iot.eclipse.org:1883";
        String clientId     = "JavaSample";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("Connecting to broker: "+broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");
            System.out.println("Publishing message: "+content);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            sampleClient.publish(topic, message);
            System.out.println("Message published");
            sampleClient.disconnect();
            System.out.println("Disconnected");
            System.exit(0);
        } catch(MqttException me) {
            System.out.println("reason "+me.getReasonCode());
            System.out.println("msg "+me.getMessage());
            System.out.println("loc "+me.getLocalizedMessage());
            System.out.println("cause "+me.getCause());
            System.out.println("excep "+me);
            me.printStackTrace();
        }
    }
}
Xenqtt

XenQTT is an MQTT support library and application suite that offers clients powerful and innovative features for working in an MQTT-enabled ecosystem.

MeQanTT

This is a project showing the basics of building an MQTT client. For anyone wanting to use it to implement their own client, there is work to be done. The code in this project is not production quality.

mqtt-client

mqtt-client provides an ASL 2.0 licensed API to MQTT. It takes care of automatically reconnecting to your MQTT server and restoring your client session if any network failures occur. Applications can use a blocking API style, a futures based API, or a callback/continuations passing API style.

Controlling MQTT Options

setClientId : Use to set the client Id of the session. This is what an MQTT server uses to identify a session where setCleanSession(false); is being used. The id must be 23 characters or less. Defaults to auto generated id (based on your socket address, port and timestamp).

setCleanSession : Set to false if you want the MQTT server to persist topic subscriptions and ack positions across client sessions. Defaults to true.

setKeepAlive : Configures the Keep Alive timer in seconds. Defines the maximum time interval between messages received from a client. It enables the server to detect that the network connection to a client has dropped, without having to wait for the long TCP/IP timeout.

setUserName : Sets the user name used to authenticate against the server.

setPassword : Sets the password used to authenticate against the server.

setWillTopic: If set the server will publish the client's Will message to the specified topics if the client has an unexpected disconnection.

setWillMessage: The Will message to send. Defaults to a zero length message.

setWillQos : Sets the quality of service to use for the Will message. Defaults to QoS.AT_MOST_ONCE.

setWillRetain: Set to true if you want the Will to be published with the retain option.

setVersion: Set to "3.1.1" to use MQTT version 3.1.1. Otherwise defaults to the 3.1 protocol version.

javascript 客户端

Eclipse Paho HTML5 JavaScript over WebSocket.

MQTT 3.1, MQTT 3.1.1
LWT
SSL / TLS
Message Persistence
WebSocket Support
Non-Blocking API
High Availability

Example code

// Create a client instance
client = new Paho.MQTT.Client(location.hostname, Number(location.port), "clientId");

// set callback handlers
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;

// connect the client
client.connect({onSuccess:onConnect});

// called when the client connects
function onConnect() {
  // Once a connection has been made, make a subscription and send a message.
  console.log("onConnect");
  client.subscribe("World");
  message = new Paho.MQTT.Message("Hello");
  message.destinationName = "World";
  client.send(message);
}

// called when the client loses its connection
function onConnectionLost(responseObject) {
  if (responseObject.errorCode !== 0) {
    console.log("onConnectionLost:"+responseObject.errorMessage);
  }
}

// called when a message arrives
function onMessageArrived(message) {
  console.log("onMessageArrived:"+message.payloadString);
}
MQTT.js

MQTT.js is a client library for the MQTT protocol, written in JavaScript for node.js and the browser.

Example code

var mqtt = require('mqtt')
var client  = mqtt.connect('mqtt://test.mosquitto.org')

client.on('connect', function () {
  client.subscribe('presence')
  client.publish('presence', 'Hello mqtt')
})

client.on('message', function (topic, message) {
  // message is Buffer
  console.log(message.toString())
  client.end()
})
node_mqtt_client

This is a simple MQTT client on node.js

The lib is still under developing.

API

Event: ‘sessionOpened’
Fired when a session with broker is opened sucesffully.

Event: ‘mqttData’
Fired when data is available for client, topic and payload have been extracted from data.
client.addListener(‘mqttData’, function(topic, payload){

});

Event: ‘openSessionFailed’
Fired when a session can not be established.

Event: ‘connectTimeOut’
Fired when cant establish a connection with server.

MQTTClient(port, host, clientID);
Construct a instance of mqtt client.
@port: port number, like 1883.
@host: server ip address.
@clientID: client name for server.

subscribe(sub_topic);
Subscribe to a topic.
@sub_topic: topic to be subscribed.

publish(pub_topic, msg);
Publish messages to a topic.
@pub_topic: publish topics.
@msg: payload data, can be anything, string, bytes.

disconnect();
Disconnect with server.
ascoltatori

The pub/sub library for node backed by Redis, MongoDB, AMQP (RabbitMQ), ZeroMQ, MQTT (Mosquitto) or just plain node!

Example code

var ascoltatori = require('ascoltatori');
settings = {
  type: 'mqtt',
  json: false,
  mqtt: require('mqtt'),
  url: 'mqtt://127.0.0.1:1883'
};

ascoltatori.build(settings, function (err, ascoltatore) {
  // ...
});

Objective-C 客户端

mqttIO-objC

Example code

#pragma mark - MQtt Callback methods

- (void)session:(MQTTSession*)sender handleEvent:(MQTTSessionEvent)eventCode {
    switch (eventCode) {
        case MQTTSessionEventConnected:
            NSLog(@"connected");

            break;
        case MQTTSessionEventConnectionRefused:
            NSLog(@"connection refused");
            break;
        case MQTTSessionEventConnectionClosed:
            NSLog(@"connection closed");
            break;
        case MQTTSessionEventConnectionError:
            NSLog(@"connection error");
            NSLog(@"reconnecting...");
            // Forcing reconnection
            [session connectToHost:@"q.m2m.io" port:1883];
            break;
        case MQTTSessionEventProtocolError:
            NSLog(@"protocol error");
            break;
    }
}
MQTTKit

MQTTKit is a modern event-driven Objective-C library for MQTT 3.1.

It uses Mosquitto 1.2.3 library.

Send a Message

// create the client with a unique client ID
NSString *clientID = ...
MQTTClient *client = [[MQTTClient alloc] initWithClientId:clientID];

// connect to the MQTT server
[self.client connectToHost:@"iot.eclipse.org" 
         completionHandler:^(NSUInteger code) {
    if (code == ConnectionAccepted) {
        // when the client is connected, send a MQTT message
        [self.client publishString:@"Hello, MQTT"
                           toTopic:@"/MQTTKit/example"
                           withQos:AtMostOnce
                            retain:NO
                 completionHandler:^(int mid) {
            NSLog(@"message has been delivered");
        }];
    }
}];

Subscribe to a Topic and Receive Messages

// define the handler that will be called when MQTT messages are received by the client
[self.client setMessageHandler:^(MQTTMessage *message) {
    NSString *text = [message.payloadString];
    NSLog(@"received message %@", text);
}];

// connect the MQTT client
[self.client connectToHost:@"iot.eclipse.org"
         completionHandler:^(MQTTConnectionReturnCode code) {
    if (code == ConnectionAccepted) {
        // when the client is connected, subscribe to the topic to receive message.
        [self.client subscribe:@"/MQTTKit/example"
         withCompletionHandler:nil];
    }
}];

Disconnect from the server

[self.client disconnectWithCompletionHandler:^(NSUInteger code) {
    // The client is disconnected when this completion handler is called
    NSLog(@"MQTT client is disconnected");
}];
MQTT-Client-Framework

Create a new client and connect to a broker:

#import "MQTTClient.h"

\@interface MyDelegate : ... <MQTTSessionDelegate>
...

        MQTTCFSocketTransport *transport = [[MQTTCFSocketTransport alloc] init];
        transport.host = @"localhost";
        transport.port = 1883;

        MQTTSession *session = [[MQTTSession alloc] init];
        session.transport = transport;

    session.delegate = self;

    [session connectAndWaitTimeout:30];  //this is part of the synchronous API

Subscribe to a topic:

[session subscribeToTopic:@"example/#" atLevel:2 subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss){
    if (error) {
        NSLog(@"Subscription failed %@", error.localizedDescription);
    } else {
        NSLog(@"Subscription sucessfull! Granted Qos: %@", gQoss);
    }
 }]; // this is part of the block API

Add the following to receive messages for the subscribed topics

 - (void)newMessage:(MQTTSession *)session
    data:(NSData *)data
    onTopic:(NSString *)topic
    qos:(MQTTQosLevel)qos
    retained:(BOOL)retained
    mid:(unsigned int)mid {
    // this is one of the delegate callbacks
}

Publish a message to a topic:

[session publishAndWaitData:data
                    onTopic:@"topic"
                     retain:NO
                    qos:MQTTQosLevelAtLeastOnce]; // this is part of the asynchronous API

swift 客户端

CocoaMQTT

1.iOS 以及 macOS 下的 MQTT 客户端,支持 MQTT v3.1.1 协议
2.适配 Swift 3.0
3.支持 Carthage 以及 CocoaPods 方式安装
4.支持 TCP/SSL Socket 连接
5.支持 Keepalive, ping/pong
6.代码样例

let clientID = "CocoaMQTT-" + String(NSProcessInfo().processIdentifier)
let mqtt = CocoaMQTT(clientID: clientID, host: "localhost", port: 1883)
mqtt.username = "test"
mqtt.password = "public"
mqtt.willMessage = CocoaMQTTWill(topic: "/will", message: "dieout")
mqtt.keepAlive = 90
mqtt.delegate = self
mqtt.connect()

PHP 客户端

phpMQTT

Publish code

<?php
require("../phpMQTT.php");

$mqtt = new phpMQTT("example.com", 1883, "phpMQTT Pub Example"); //Change client name to something unique
if ($mqtt->connect()) {
    $mqtt->publish("bluerhinos/phpMQTT/examples/publishtest","Hello World! at ".date("r"),0);
    $mqtt->close();
}
?>
Mosquitto-PHP

This is an extension to allow using the Eclipse Mosquitto™ MQTT client library with PHP.

Example code

<?php

$c = new Mosquitto\Client;
$c->onConnect(function() use ($c) {
    $c->publish('mgdm/test', 'Hello', 2);
});

$c->connect('test.mosquitto.org');

for ($i = 0; $i < 100; $i++) {
    // Loop around to permit the library to do its work
    $c->loop(1);
}

echo "Finished\n";
sskaje's MQTT library

Example code

$mqtt = new MQTT($MQTT_SERVER, '123');
$mqtt->setVersion(MQTT::VERSION_3_1_1);
#$mqtt->setVersion(MQTT::VERSION_3_1);
Debug::Enable();
$mqtt->setKeepalive(60);
$connected = $mqtt->connect();
if (!$connected) {
    die("Not connected\n");
}
log_test('publish() QoS 0');
$mqtt->publish(
    'qos/0',
    'This is a QoS 0 Message',
    0,
    0,
    0,
    0
);

python 客户端

Eclipse Paho Python

Example code

import paho.mqtt.client as mqtt

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("$SYS/#")

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("iot.eclipse.org", 1883, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()
nyamuk

Nyamuk is a python MQTT library, originally based on libmosquitto.
It implements both 3.1 and 3.1.1 versions of MQTT protocol.
Currently only supporting python 2.7

Publishing a message with Qos 1 (with MQTT v3.1.1)

import sys
from nyamuk import *

def nloop(client):
    client.packet_write()     # flush write buffer (messages sent to MQTT server)
    client.loop()             # fill read buffer   (enqueue received messages)
    return client.pop_event() # return 1st received message (dequeued)

client = Nyamuk("test_nyamuk", server="test.mosquitto.org")
ret = client.connect(version=4)
ret = nloop(client) # ret should be EventConnack object
if not isinstance(ret, EventConnack) or ret.ret_code != 0:
    print 'connection failed'; sys.exit(1)

client.publish('foo/bar', 'this is a test', qos=1)
ret = nloop(client) # ret should be EventPuback

client.disconnect()

Subscribing a topic

import sys
from nyamuk import *

def nloop(client):
    client.packet_write()     # flush write buffer (messages sent to MQTT server)
    client.loop()             # fill read buffer   (enqueue received messages)
    return client.pop_event() # return 1st received message (dequeued)

client = Nyamuk("test_nyamuk", server="test.mosquitto.org")
ret = client.connect(version=4)
ret = nloop(client) # ret should be EventConnack object
if not isinstance(ret, EventConnack) or ret.ret_code != 0:
    print 'connection failed'; sys.exit(1)

client.subscribe('foo/bar', qos=1)
ret = nloop(client)
if not isinstance(ret, EventSuback):
    print 'SUBACK not received'; sys.exit(2)
print 'granted qos is', ret.granted_qos[0]

try:
    while True:
        evt = nloop(client)
        if isinstance(evt, EventPublish):
            print 'we received a message: {0} (topic= {1})'.format(evt.msg.payload, evt.msg.topic)

            # received message is either qos 0 or 1
            # in case of qos 1, we must send back PUBACK message with same packet-id
            if evt.msg.qos == 1:
                client.puback(evt.msg.mid)

except KeyboardInterrupt:
    pass

client.disconnect()
MQTT-For-Twisted-Python

MQTT for Twisted Python

Usage:
Subclass MQTTProtocol and implement *received methods

HBMQTT

HBMQTT is an open source MQTT client and broker implementation.

Built on top of asyncio, Python's standard asynchronous I/O framework, HBMQTT provides a straightforward API based on coroutines, making it easy to write highly concurrent applications.

Subscribe example

import logging
import asyncio

from hbmqtt.client import MQTTClient, ClientException
from hbmqtt.mqtt.constants import QOS_1, QOS_2

#
# This sample shows how to subscbribe a topic and receive data from incoming messages
# It subscribes to '$SYS/broker/uptime' topic and displays the first ten values returned
# by the broker.
#

logger = logging.getLogger(__name__)

@asyncio.coroutine
def uptime_coro():
    C = MQTTClient()
    yield from C.connect('mqtt://test.mosquitto.org/')
    # Subscribe to '$SYS/broker/uptime' with QOS=1
    yield from C.subscribe([
                ('$SYS/broker/uptime', QOS_1),
                ('$SYS/broker/load/#', QOS_2),
             ])
    logger.info("Subscribed")
    try:
        for i in range(1, 100):
            message = yield from C.deliver_message()
            packet = message.publish_packet
            print("%d: %s => %s" % (i, packet.variable_header.topic_name, str(packet.payload.data)))
        yield from C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
        logger.info("UnSubscribed")
        yield from C.disconnect()
    except ClientException as ce:
        logger.error("Client exception: %s" % ce)

if __name__ == '__main__':
    formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.INFO, format=formatter)
    asyncio.get_event_loop().run_until_complete(uptime_coro())

ruby 客户端

ruby-mqtt

Pure Ruby gem that implements the MQTT protocol, a lightweight protocol for publish/subscribe messaging.

Also includes a class for parsing and generating MQTT-SN packets.

Example code

require 'rubygems'
require 'mqtt'

# Publish example
MQTT::Client.connect('test.mosquitto.org') do |c|
  c.publish('test', 'message')
end

# Subscribe example
MQTT::Client.connect('test.mosquitto.org') do |c|
  # If you pass a block to the get method, then it will loop
  c.get('test') do |topic,message|
    puts "#{topic}: #{message}"
  end
end
mosquitto

Example code

require 'mosquitto'

publisher = Mosquitto::Client.new("blocking")

# Spawn a network thread with a main loop
publisher.loop_start

# On publish callback
publisher.on_publish do |mid|
  p "Published #{mid}"
end

# On connect callback
publisher.on_connect do |rc|
  p "Connected with return code #{rc}"
  # publish a test message once connected
  publisher.publish(nil, "topic", "test message", Mosquitto::AT_MOST_ONCE, true)
end

# Connect to MQTT broker
publisher.connect("test.mosquitto.org", 1883, 10)

# Allow some time for processing
sleep 2

publisher.disconnect

# Optional, stop the threaded loop - the network thread would be reaped on Ruby exit as well
publisher.loop_stop(true)