RabbitMQ -Python Client (MQTT) #5 : MQTT implementation in RabbitMQ

 So far, we have looked at how to use the RabbitMQ protocol of the AMQP standard. RabbitMQ supports the MQTT protocol as well as AMQP. MQTT is more lightweight than AMQP and was created considering poor network environment. Therefore, it is designed to be used in IoT devices that have low computing power and are exposed to outdoor environments.

A typical MQTT message broker is Mosquitto.

I have previously presented C++ and Python examples of working with the Mosquitto Message Broker.


However, RabbitMQ also supports the MQTT protocol. This time I will look at how to use the MQTT protocol in RabbitMQ.


Enabling MQTT in RabbitMQ docker image

The Docker images tested so far have MQTT disabled. To activate the MQTT plugin, docker must be newly built and used as follows.

# cat Dockerfile
FROM rabbitmq:management
RUN rabbitmq-plugins enable --offline rabbitmq_mqtt

Then build the new docker image.

# docker build -t rmqcustom .


Run the new docker image. Note the addition of the 1883 port. This port is used for MQTT service.

# docker run -it -p 15672:15672 -p 5672:5672 -p 1883:1883  rmqcustom

You can see that the 1883 port has been newly added in the management page.


In RabbitMQ, MQTT messages are received through the amq.topic exchange. Unlike AMQP, MQTT messages do not directly specify exchanges or views. Messages are delivered using topics.


Python MQTT

I made 3 client programs. All are using the paho module. The first subscriber will receive messages using topic "topicA/*".

The second subscriber will receive messages using topic "topicA/topicB".

And the last publisher will send a message using topic "topicA/*".


import paho.mqtt.client as mqtt
import sys

# Define event callbacks
def on_connect(client, userdata, flags, rc):
    if rc:
        print("connect failed rc: " + str(rc))
        sys.exit()
    else:    
        print("connected ")
    # Start subscribe, with QoS level 0
    mqttc.subscribe(topic)

def on_message(client, obj, msg):
    print("RCV topic[" + msg.topic + "] qos[" + str(msg.qos) + "]  patload[" + str(msg.payload) + "]") 

def on_subscribe(client, obj, mid, granted_qos):
    print("Subscribed: " + str(mid) + " QOS:" + str(granted_qos))

topic = "topicA/*"
mqttc = mqtt.Client()
# Assign event callbacks
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_subscribe = on_subscribe

# Parse CLOUDMQTT_URL (or fallback to localhost)
mqttc.connect("192.168.150.128", 1883)

# Continue the network loop, exit when an error occurs
mqttc.loop_forever()

<mqtt_sub.py>



import paho.mqtt.client as mqtt
import sys

# Define event callbacks
def on_connect(client, userdata, flags, rc):
    if rc:
        print("connect failed rc: " + str(rc))
        sys.exit()
    else:    
        print("connected ")
    # Start subscribe, with QoS level 0
    mqttc.subscribe(topic)

def on_message(client, obj, msg):
    print("RCV topic[" + msg.topic + "] qos[" + str(msg.qos) + "]  patload[" + str(msg.payload) + "]") 

def on_subscribe(client, obj, mid, granted_qos):
    print("Subscribed: " + str(mid) + " QOS:" + str(granted_qos))

topic = "topicA/topicB"
mqttc = mqtt.Client()
# Assign event callbacks
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_subscribe = on_subscribe

# Parse CLOUDMQTT_URL (or fallback to localhost)
mqttc.connect("192.168.150.128", 1883)

# Continue the network loop, exit when an error occurs
mqttc.loop_forever()

<mqtt_sub2.py>


import paho.mqtt.client as mqtt
import sys

# Define event callbacks
def on_connect(client, userdata, flags, rc):
    if rc:
        print("connect failed rc: " + str(rc))
        sys.exit()
    else:    
        print("connected ")
    # Start subscribe, with QoS level 0
    mqttc.publish(topic, "my message")

def on_message(client, obj, msg):
    print("RCV topic[" + msg.topic + "] qos[" + str(msg.qos) + "]  patload[" + str(msg.payload) + "]") 

def on_publish(client, obj, mid):
    print("published: " + str(mid))


def on_log(client, obj, level, string):
    print(string)

topic = "topicA/topicB"
mqttc = mqtt.Client()
# Assign event callbacks
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish

# Parse CLOUDMQTT_URL (or fallback to localhost)
mqttc.connect("192.168.150.128", 1883)

# Continue the network loop, exit when an error occurs
mqttc.loop_forever()

<mqtt_pub.py>


Run 2 subscribers first. Then run the publisher. As shown below, we can see that the two waiting subscribers receive messages normally.

python mqtt_sub.py
connected
Subscribed: 1 QOS:(0,)
RCV topic[topicA/topicB] qos[0]  patload[b'my message']


python mqtt_sub.py
connected
Subscribed: 1 QOS:(0,)
RCV topic[topicA/topicB] qos[0]  patload[b'my message']

python mqtt_pub.py
connected
published: 1


Looking at the view when clients using the MQTT protocol are running, when a subscriber connects, RabbitMQ creates a queue with a unique name in real time. And this queue has an Auto delete value of True. Therefore, the queue is automatically cleared when the subscriber's connection is terminated.



The two queues in the above figure are created dynamically as subscribers connect.

Also, if you look at the amq.topic exchange, you can see that the binding is dynamically created as follows.


The binding and two queues in the picture above disappear when the subscriber exits.


Wrapping up

If you only use MQTT service, you don't need to use RabbitMQ. I recommend using a much lighter-weight MQTT-only message broker like Mosquitto.

However, if you need to use MQTT together with AMQP, RabbitMQ can easily handle both.

The source code can be downloaded from my github.


댓글

이 블로그의 인기 게시물

MQTT - C/C++ Client

RabbitMQ - C++ Client #1 : Installing C/C++ Libraries

C/C++ - Everything about time, date