MQTT - Python Client
This article records how to implement the client directly using Python, which is linked to the Mosquito MQTT broker set-up on the Ubuntu 20.04, introduced in MQTTT - Mosquito MQT broker set-up on the Ubuntu 20.04.
This article is a continuation of the above article. The service port of mosquitto was originally 1883. However, I changed it to 8883 while explaining how to change the service port in the above article. Therefore, in this article, the mosquitto service will be referred to as 8883.
Install Paho
Paho is a Python client module provided by the EClipse Foundation with the Mosquito MQTT broker. Use the pip3 command to install the paho.
pip install paho-mqtt
Simple Subscriber
The following Python code works the same as the Mosquito_sub program. At the end of the previous article, we even implemented the use of ssl with the user id/password. Therefore, the same will be implemented in the Python program.
import paho.mqtt.client as paho import ssl, sys def on_message(clnt, userdata, msg): message = msg.payload.decode('utf-8') print("Topic[" + msg.topic+"] " + message) def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) if rc == 0: print("Connection successful") elif rc == 1: print("Connection refused – incorrect protocol version") elif rc == 2: print("Connection refused – invalid client identifier") elif rc == 3: print("Connection refused – server unavailable") elif rc == 4: print("Connection refused – bad username or password") elif rc == 5: print("Connection refused – not authorised") else: print("Connection refused unused") if rc == 0: mqttc.subscribe("test") else: sys.exit(1) mqttc = paho.Client() mqttc.username_pw_set(username="dongu",password="dongdong") mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.tls_set("./ca.crt", tls_version=ssl.PROTOCOL_TLSv1_2) rc = mqttc.connect("192.168.11.86", 8883, 60) mqttc.loop_forever()
At the end of the previous article, we even implemented the use of ssl with the user id/password. Therefore, the same will be implemented in the Python program. I have copied the ca.crt certificate to the same directory as the Python source code.
We can see that the MQTT Python code implemented by Windows works well.
Simple publisher + subscriber
If you want to implement publisher and subscriber in one program at a time, it is convenient to use threading as follows:
import paho.mqtt.client as paho import ssl, sys import threading, time def my_publish(): while True: time.sleep(0.5) text = input('Publish Text:') if text: mqttc.publish("test", text) def on_message(clnt, userdata, msg): message = msg.payload.decode('utf-8') print("\nReceived Topic[" + msg.topic+"]:" + message) def on_connect(client, userdata, flags, rc, properties): print("Connected with result code "+str(rc)) if rc == 0: print("Connection successful") elif rc == 1: print("Connection refused – incorrect protocol version") elif rc == 2: print("Connection refused – invalid client identifier") elif rc == 3: print("Connection refused – server unavailable") elif rc == 4: print("Connection refused – bad username or password") elif rc == 5: print("Connection refused – not authorised") else: print("Connection refused unused") if rc == 0: mqttc.subscribe("test") else: sys.exit(1) mqttc = paho.Client(paho.CallbackAPIVersion.VERSION2) mqttc.username_pw_set(username="dongu",password="dongdong") mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.tls_set("./ca.crt", tls_version=ssl.PROTOCOL_TLSv1_2) rc = mqttc.connect("192.168.11.86", 8883, 60) t1 = threading.Thread(target=my_publish) t1.daemon = True t1.start() mqttc.loop_forever()
<python code with both subscribe, publish>
Handling multiple topics
This time, let's see how to handle the topic of hierarchical structure introduced in MQTT - Mosquitto MQTT Broker setup on the Ubuntu 20.04.
Now suppose the publisher publishes Home/Kitchen/temperature and brightness. And let's write some code for subscribers to subscribe to them.
Modifying Python code is very easy.
Simple Subscriber that receives multiple topics
import paho.mqtt.client as paho import ssl, sys def on_message(clnt, userdata, msg): message = msg.payload.decode('utf-8') print("Topic[" + msg.topic+"] " + message) def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) if rc == 0: print("Connection successful") elif rc == 1: print("Connection refused – incorrect protocol version") elif rc == 2: print("Connection refused – invalid client identifier") elif rc == 3: print("Connection refused – server unavailable") elif rc == 4: print("Connection refused – bad username or password") elif rc == 5: print("Connection refused – not authorised") else: print("Connection refused unused") if rc == 0: mqttc.subscribe("Home/Kitchen/#")
else:sys.exit(1) mqttc = paho.Client() mqttc.username_pw_set(username="dongu",password="dongdong") mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.tls_set("./ca.crt", tls_version=ssl.PROTOCOL_TLSv1_2) rc = mqttc.connect("192.168.11.86", 8883, 60) mqttc.loop_forever()
The only change is that the parameter of the subscribe function has changed. "Home/Kitchen/#" means to subscribe to all topics under Home/Kitchen.
Simple publisher + subscriber that sends multiple topics
import paho.mqtt.client as paho import ssl, sys import threading, time def my_publish(): while True: time.sleep(0.5) text = input('Publish Text:') if text: mqttc.publish("Home/Kitchen/temperature", text)
mqttc.publish("Home/Kitchen/brightness", text + ' - brightness')
def on_message(clnt, userdata, msg): message = msg.payload.decode('utf-8') print("\nReceived Topic[" + msg.topic+"]:" + message) def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) if rc == 0: print("Connection successful") elif rc == 1: print("Connection refused – incorrect protocol version") elif rc == 2: print("Connection refused – invalid client identifier") elif rc == 3: print("Connection refused – server unavailable") elif rc == 4: print("Connection refused – bad username or password") elif rc == 5: print("Connection refused – not authorised") else: print("Connection refused unused") if rc == 0: mqttc.subscribe("Home/Kitchen/#")
else: sys.exit(1) mqttc = paho.Client() mqttc.username_pw_set(username="dongu",password="dongdong") mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.tls_set("./ca.crt", tls_version=ssl.PROTOCOL_TLSv1_2) rc = mqttc.connect("192.168.11.86", 8883, 60) t1 = threading.Thread(target=my_publish) t1.daemon = True t1.start() mqttc.loop_forever()
QoS Control
MQTT - As described in Mosquitto MQTT broker set-up on the Ubuntu 20.04, MQTT supports three types of QoS. This time, let's see how to configure QoS.
Since the MQTT broker manages subscribers based on the client id, to use QOS1 and QOS2, the client id must be kept the same. Therefore, when creating a Client object, the client_id value must be explicitly provided. If the client id is not explicitly provided, a unique client_id value is automatically generated in the Client object constructor. Therefore, a different unique client_id value will be created each time the process is restarted.
mqttc = paho.Client(client_id = "MyClient-1")
Warning : The client id must have a unique value. If two or more clients have the same client_id, the previous connection is forcibly terminated due to the second connection. If the automatic restart function is implemented, the second client is terminated while the first client restarts. This ping-pong situation may repeat itself, causing strange phenomena.
- QOS 0 – Once (not guaranteed)
- QOS 1 – At Least Once (guaranteed)
- QOS 2 – Only Once (guaranteed)
Publishing
Send a message from the client to the broker.
publish()
publish(topic, payload=None, qos=0, retain=False)
This causes a message to be sent to the broker and subsequently from the broker to any clients subscribing to matching topics. It takes the following arguments:
- topic
- the topic that the message should be published on
- payload
- the actual message to send. If not given, or set to None a zero length message will be used. Passing an int or float will result in the payload being converted to a string representing that number. If you wish to send a true int/float, use struct.pack() to create the payload you require
- qos
- the quality of service level to use
- retain
- if set to True, the message will be set as the "last known good"/retained message for the topic.
Returns a MQTTMessageInfo which expose the following attributes and methods:
- rc, the result of the publishing. It could be MQTT_ERR_SUCCESS to indicate success, MQTT_ERR_NO_CONN if the client is not currently connected, or MQTT_ERR_QUEUE_SIZE when max_queued_messages_set is used to indicate that message is neither queued nor sent.
- mid is the message ID for the publish request. The mid value can be used to track the publish request by checking against the mid argument in the on_publish() callback if it is defined. wait_for_publish may be easier depending on your use-case.
- wait_for_publish() will block until the message is published. It will raise ValueError if the message is not queued (rc == MQTT_ERR_QUEUE_SIZE).
- is_published returns True if the message has been published. It will raise ValueError if the message is not queued (rc == MQTT_ERR_QUEUE_SIZE).
A ValueError will be raised if topic is None, has zero length or is invalid (contains a wildcard), if qos is not one of 0, 1 or 2, or if the length of the payload is greater than 268435455 bytes.
Callback (publish)
When the message has been sent to the broker an on_publish() callback will be generated.
def my_publish(): while True: time.sleep(0.5) text = input('Publish Text:') if text: mqttc.publish("Home/Kitchen/temperature", text, qos=2) mqttc.publish("Home/Kitchen/brightness", text + ' - brightness', qos=2)
The subscribe function can also assign a qos value. The Python Client - documentation has the following manual for the subscribe function that sends a message.
Subscribe / Unsubscribe
subscribe()
subscribe(topic, qos=0)
Subscribe the client to one or more topics.
This function may be called in three different ways:
Simple string and integer
e.g. subscribe("my/topic", 2)
- topic
- a string specifying the subscription topic to subscribe to.
- qos
- the desired quality of service level for the subscription. Defaults to 0.
String and integer tuple
e.g. subscribe(("my/topic", 1))
- topic
- a tuple of (topic, qos). Both topic and qos must be present in the tuple.
- qos
- not used.
List of string and integer tuples
e.g. subscribe([("my/topic", 0), ("another/topic", 2)])
This allows multiple topic subscriptions in a single SUBSCRIPTION command, which is more efficient than using multiple calls to subscribe().
- topic
- a list of tuple of format (topic, qos). Both topic and qos must be present in all of the tuples.
- qos
- not used.
The function returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS to indicate success or (MQTT_ERR_NO_CONN, None) if the client is not currently connected. mid is the message ID for the subscribe request. The mid value can be used to track the subscribe request by checking against the mid argument in the on_subscribe() callback if it is defined.
Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has zero string length, or if topic is not a string, tuple or list.
Callback (subscribe)
When the broker has acknowledged the subscription, an on_subscribe() callback will be generated.
unsubscribe()
unsubscribe(topic)
Unsubscribe the client from one or more topics.
- topic
- a single string, or list of strings that are the subscription topics to unsubscribe from.
Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS to indicate success, or (MQTT_ERR_NO_CONN, None) if the client is not currently connected. mid is the message ID for the unsubscribe request. The mid value can be used to track the unsubscribe request by checking against the mid argument in the on_unsubscribe() callback if it is defined.
Raises a ValueError if topic is None or has zero string length, or is not a string or list.
Callback (unsubscribe)
When the broker has acknowledged the unsubscribe, an on_unsubscribe() callback will be generated.
Just like the publish function, you can add the qos value as a parameter.
if rc == 0: mqttc.subscribe("Home/Kitchen/#", qos =2)
Wrapping up
Implementing MQTT Client with Python is quite easy. Paho can be installed on various platforms, including Raspberry Pi OS and Mac, as well as Windows. A detailed manual on using Paho can be found at https://www.eclipse.org/paho/index.php?page=clients/python/docs/index.php.
If you are interested in mqt client implemented by c/c++, see MQTT - C/C++ Client
댓글
댓글 쓰기