MQTT - Python Client

 This article records how to implement the client directly using Python, which is linked to the Mosquito MQTT broker set-up on the Ubuntu 20.04, introduced in MQTTT - Mosquito MQT broker set-up on the Ubuntu 20.04.

 This article is a continuation of the above article. The service port of mosquitto was originally 1883. However, I changed it to 8883 while explaining how to change the service port in the above article. Therefore, in this article, the mosquitto service will be referred to as 8883.


Install Paho

 Paho is a Python client module provided by the EClipse Foundation with the Mosquito MQTT broker. Use the pip3 command to install the paho.

pip install paho-mqtt

 

Simple Subscriber

The following Python code works the same as the Mosquito_sub program. At the end of the previous article, we even implemented the use of ssl with the user id/password. Therefore, the same will be implemented in the Python program.

import paho.mqtt.client as paho
import ssl, sys

def on_message(clnt, userdata, msg):
       message = msg.payload.decode('utf-8')
       print("Topic[" + msg.topic+"] " + message)

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    if rc == 0:
        print("Connection successful")
    elif rc == 1:
        print("Connection refused – incorrect protocol version")    
    elif rc == 2:
        print("Connection refused – invalid client identifier")    
    elif rc == 3:
        print("Connection refused – server unavailable")    
    elif rc == 4:
        print("Connection refused – bad username or password")    
    elif rc == 5:
        print("Connection refused – not authorised")    
    else:
        print("Connection refused unused")  
    if rc == 0:
        mqttc.subscribe("test")
    else:
        sys.exit(1)

mqttc = paho.Client()
mqttc.username_pw_set(username="dongu",password="dongdong")
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.tls_set("./ca.crt", tls_version=ssl.PROTOCOL_TLSv1_2)
rc = mqttc.connect("192.168.11.86", 8883, 60)
  

mqttc.loop_forever()

At the end of the previous article, we even implemented the use of ssl with the user id/password. Therefore, the same will be implemented in the Python program. I have copied the ca.crt certificate to the same directory as the Python source code.


 We can see that the MQTT Python code implemented by Windows works well.


Simple publisher + subscriber

 If you want to implement publisher and subscriber in one program at a time, it is convenient to use threading as follows:

import paho.mqtt.client as paho
import ssl, sys
import threading, time

def my_publish():
    while True:
        time.sleep(0.5)
        text = input('Publish Text:')
        if text:
            mqttc.publish("test", text)
            

def on_message(clnt, userdata, msg):
       message = msg.payload.decode('utf-8')
       print("\nReceived Topic[" + msg.topic+"]:" + message)

def on_connect(client, userdata, flags, rc, properties):
    print("Connected with result code "+str(rc))
    if rc == 0:
        print("Connection successful")
    elif rc == 1:
        print("Connection refused – incorrect protocol version")    
    elif rc == 2:
        print("Connection refused – invalid client identifier")    
    elif rc == 3:
        print("Connection refused – server unavailable")    
    elif rc == 4:
        print("Connection refused – bad username or password")    
    elif rc == 5:
        print("Connection refused – not authorised")    
    else:
        print("Connection refused unused")  
    if rc == 0:
        mqttc.subscribe("test")
    else:
        sys.exit(1)

mqttc = paho.Client(paho.CallbackAPIVersion.VERSION2)
mqttc.username_pw_set(username="dongu",password="dongdong")
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.tls_set("./ca.crt", tls_version=ssl.PROTOCOL_TLSv1_2)
rc = mqttc.connect("192.168.11.86", 8883, 60)
  
t1 = threading.Thread(target=my_publish)
t1.daemon = True
t1.start()
mqttc.loop_forever()

 <python code with both subscribe, publish>

 


Handling multiple topics 

This time, let's see how to handle the topic of hierarchical structure introduced in MQTT - Mosquitto MQTT Broker setup on the Ubuntu 20.04.


Now suppose the publisher publishes Home/Kitchen/temperature and brightness. And let's write some code for subscribers to subscribe to them.

Modifying Python code is very easy.


Simple Subscriber that receives multiple topics

import paho.mqtt.client as paho
import ssl, sys

def on_message(clnt, userdata, msg):
       message = msg.payload.decode('utf-8')
       print("Topic[" + msg.topic+"] " + message)

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    if rc == 0:
        print("Connection successful")
    elif rc == 1:
        print("Connection refused – incorrect protocol version")    
    elif rc == 2:
        print("Connection refused – invalid client identifier")    
    elif rc == 3:
        print("Connection refused – server unavailable")    
    elif rc == 4:
        print("Connection refused – bad username or password")    
    elif rc == 5:
        print("Connection refused – not authorised")    
    else:
        print("Connection refused unused")  
    if rc == 0:
        mqttc.subscribe("Home/Kitchen/#")
    else:
sys.exit(1) mqttc = paho.Client() mqttc.username_pw_set(username="dongu",password="dongdong") mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.tls_set("./ca.crt", tls_version=ssl.PROTOCOL_TLSv1_2) rc = mqttc.connect("192.168.11.86", 8883, 60) mqttc.loop_forever()

The only change is that the parameter of the subscribe function has changed. "Home/Kitchen/#" means to subscribe to all topics under Home/Kitchen.


Simple publisher + subscriber that sends multiple topics


import paho.mqtt.client as paho
import ssl, sys
import threading, time

def my_publish():
    while True:
        time.sleep(0.5)
        text = input('Publish Text:')
        if text:
            mqttc.publish("Home/Kitchen/temperature", text)            
            mqttc.publish("Home/Kitchen/brightness", text + ' - brightness')
def on_message(clnt, userdata, msg): message = msg.payload.decode('utf-8') print("\nReceived Topic[" + msg.topic+"]:" + message) def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) if rc == 0: print("Connection successful") elif rc == 1: print("Connection refused – incorrect protocol version") elif rc == 2: print("Connection refused – invalid client identifier") elif rc == 3: print("Connection refused – server unavailable") elif rc == 4: print("Connection refused – bad username or password") elif rc == 5: print("Connection refused – not authorised") else: print("Connection refused unused") if rc == 0: mqttc.subscribe("Home/Kitchen/#")
    else:
        sys.exit(1)

mqttc = paho.Client()
mqttc.username_pw_set(username="dongu",password="dongdong")
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.tls_set("./ca.crt", tls_version=ssl.PROTOCOL_TLSv1_2)
rc = mqttc.connect("192.168.11.86", 8883, 60)
  
t1 = threading.Thread(target=my_publish)
t1.daemon = True
t1.start()
mqttc.loop_forever()


The only change is to try on 2 topics in the publish function. And I used "Home/Kitchen/#" in the subscribe function. 

You can see that it works like this:



QoS Control 

MQTT - As described in Mosquitto MQTT broker set-up on the Ubuntu 20.04, MQTT supports three types of QoS. This time, let's see how to configure QoS. 

Since the MQTT broker manages subscribers based on the client id, to use QOS1 and QOS2, the client id must be kept the same. Therefore, when creating a Client object, the client_id value must be explicitly provided. If the client id is not explicitly provided, a unique client_id value is automatically generated in the Client object constructor. Therefore, a different unique client_id value will be created each time the process is restarted.

mqttc = paho.Client(client_id = "MyClient-1")          

WarningThe client id must have a unique value. If two or more clients have the same client_id, the previous connection is forcibly terminated due to the second connection. If the automatic restart function is implemented, the second client is terminated while the first client restarts. This ping-pong situation may repeat itself, causing strange phenomena. 


MQTT provides 3 QOS levels-
  • QOS 0 – Once (not guaranteed)
  • QOS 1 – At Least Once (guaranteed)
  • QOS 2 – Only Once (guaranteed)
The Python Client - documentation has the following manual for the publish function that sends a message. 

Publishing

Send a message from the client to the broker.

publish()

publish(topic, payload=None, qos=0, retain=False)

This causes a message to be sent to the broker and subsequently from the broker to any clients subscribing to matching topics. It takes the following arguments:

topic
the topic that the message should be published on
payload
the actual message to send. If not given, or set to None a zero length message will be used. Passing an int or float will result in the payload being converted to a string representing that number. If you wish to send a true int/float, use struct.pack() to create the payload you require
qos
the quality of service level to use
retain
if set to True, the message will be set as the "last known good"/retained message for the topic.

Returns a MQTTMessageInfo which expose the following attributes and methods:

  • rc, the result of the publishing. It could be MQTT_ERR_SUCCESS to indicate success, MQTT_ERR_NO_CONN if the client is not currently connected, or MQTT_ERR_QUEUE_SIZE when max_queued_messages_set is used to indicate that message is neither queued nor sent.
  • mid is the message ID for the publish request. The mid value can be used to track the publish request by checking against the mid argument in the on_publish() callback if it is defined. wait_for_publish may be easier depending on your use-case.
  • wait_for_publish() will block until the message is published. It will raise ValueError if the message is not queued (rc == MQTT_ERR_QUEUE_SIZE).
  • is_published returns True if the message has been published. It will raise ValueError if the message is not queued (rc == MQTT_ERR_QUEUE_SIZE).

ValueError will be raised if topic is None, has zero length or is invalid (contains a wildcard), if qos is not one of 0, 1 or 2, or if the length of the payload is greater than 268435455 bytes.

Callback (publish)

When the message has been sent to the broker an on_publish() callback will be generated.


It can be seen that QoS 0 is used as a default value. QoS 0 is the lightest transmission method. If you only use MQTT for real-time data transmission, QoS 0 is a good choice. Mosquitto Borker's least-loaded method. If reliable data delivery is required while blocking the possibility of duplicate delivery, QoS 2 is used. This method is relatively slow because of the verification process related to message delivery between the broker and subscriber.

If you want to adjust QoS of publishing, add the qos value to the publish function as follows.

def my_publish():
    while True:
        time.sleep(0.5)
        text = input('Publish Text:')
        if text:
            mqttc.publish("Home/Kitchen/temperature", text, qos=2)            
            mqttc.publish("Home/Kitchen/brightness", text + ' - brightness', qos=2)
            


The subscribe function can also assign a qos value. The Python Client - documentation has the following manual for the subscribe function that sends a message.


Subscribe / Unsubscribe

subscribe()

subscribe(topic, qos=0)

Subscribe the client to one or more topics.

This function may be called in three different ways:

Simple string and integer

e.g. subscribe("my/topic", 2)

topic
a string specifying the subscription topic to subscribe to.
qos
the desired quality of service level for the subscription. Defaults to 0.
String and integer tuple

e.g. subscribe(("my/topic", 1))

topic
a tuple of (topic, qos). Both topic and qos must be present in the tuple.
qos
not used.
List of string and integer tuples

e.g. subscribe([("my/topic", 0), ("another/topic", 2)])

This allows multiple topic subscriptions in a single SUBSCRIPTION command, which is more efficient than using multiple calls to subscribe().

topic
a list of tuple of format (topic, qos). Both topic and qos must be present in all of the tuples.
qos
not used.

The function returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS to indicate success or (MQTT_ERR_NO_CONN, None) if the client is not currently connected. mid is the message ID for the subscribe request. The mid value can be used to track the subscribe request by checking against the mid argument in the on_subscribe() callback if it is defined.

Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has zero string length, or if topic is not a string, tuple or list.

Callback (subscribe)

When the broker has acknowledged the subscription, an on_subscribe() callback will be generated.

unsubscribe()

unsubscribe(topic)

Unsubscribe the client from one or more topics.

topic
a single string, or list of strings that are the subscription topics to unsubscribe from.

Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS to indicate success, or (MQTT_ERR_NO_CONN, None) if the client is not currently connected. mid is the message ID for the unsubscribe request. The mid value can be used to track the unsubscribe request by checking against the mid argument in the on_unsubscribe() callback if it is defined.

Raises a ValueError if topic is None or has zero string length, or is not a string or list.

Callback (unsubscribe)

When the broker has acknowledged the unsubscribe, an on_unsubscribe() callback will be generated.


Just like the publish function, you can add the qos value as a parameter.

    if rc == 0:
        mqttc.subscribe("Home/Kitchen/#", qos =2)



Wrapping up

Implementing MQTT Client with Python is quite easy. Paho can be installed on various platforms, including Raspberry Pi OS and Mac, as well as Windows.  A detailed manual on using Paho can be found at https://www.eclipse.org/paho/index.php?page=clients/python/docs/index.php.

If you are interested in mqt client implemented by c/c++, see MQTT - C/C++ Client


Useful links








댓글

이 블로그의 인기 게시물

MQTT - C/C++ Client

RabbitMQ - C++ Client #1 : Installing C/C++ Libraries

C/C++ - Everything about time, date