RabbitMQ -Python Client #6 : MQ Connection Management
Applications that need to maintain persistent MQ connections should take special care to manage MQ connections. I recently created a daemon application that sends messages to Rabbit MQ, but I remember having a hard time with connection management.
I have prepared several guides for managing Rabbit MQ connections. In this article, we will use Python instead of C/C++. Therefore, we will use pika, a Python module for Rabbit MQ.
Preparations
First, Exchange and Queue are created in Rabbit MQ in advance as follows.
Application for one-time sending of messages
import pika import pika.exceptions import time credentials = pika.PlainCredentials(username = "guest", password="guest") connection = None channel = None def connectMQ(): global connection, channel #connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1", port=5672 , heartbeat=60, tcp_options={'TCP_KEEPIDLE':10})) connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1", port=5672)) channel = connection.channel() def sample_pub(): ex = 'sample_ex' key = ['rk_A', 'rk_B'] msg = 'Hello World' for n in range(2): channel.basic_publish(ex, key[n], msg.encode()) connectMQ() sample_pub()
<simple_pub.py>
When this Python code is executed, messages are sent to QA and QB one by one, as shown in the following figure.
RabbitMQ Connection Timeout
This python script does nothing after establishing a connection with MQ and just waits.
The heartbeat value of ConnectionParameters is not specified, but in this case, the default value of 60 seconds is used.
import pika import sys import time import os credentials = pika.PlainCredentials(username = "guest", password="guest") connection = None channel = None def connectMQ(): global connection, channel connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1", port=5672)) channel = connection.channel() def sample_pub(): ex = 'sample_ex' key = ['rk_A', 'rk_B'] msg = 'Hello World' for n in range(2): channel.basic_publish(ex, key[n], msg.encode()) connectMQ() while True: time.sleep(1)
<simple_conn.py>
If you run this Python code and look at Connections on the Rabbit MQ management page, you will see that a new connection has been created as follows. Now let's observe this connection for a moment. You can see that the connection disappears after about 3 minutes.
Connection monitoring can be checked by using the watch command and the netstat command together as follows.
[root@localhost rabbit]# watch -n 5 "netstat -ano |grep 5672" Every 5.0s: netstat -ano |grep 5672 localhost.localdomain: Sat May 20 14:52:27 2023 tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN off (0.00/0/0) tcp 0 0 0.0.0.0:15672 0.0.0.0:* LISTEN off (0.00/0/0) tcp 0 0 127.0.0.1:44892 127.0.0.1:5672 ESTABLISHED off (0.00/0/0) tcp 0 0 192.168.150.146:15672 192.168.150.1:19828 ESTABLISHED off (0.00/0/0) tcp6 0 0 :::5672 :::* LISTEN off (0.00/0/0) tcp6 0 0 127.0.0.1:5672 127.0.0.1:44892 ESTABLISHED off (0.00/0/0)
Similar to the RabbitMQ management page, you can see that the connection disappears after about 3 minutes. Therefore, if the heartbeat value is set high, the connection time can be maintained for a long time. However, it is a good practice to send an appropriate response to the heartbeat sent by RabbitMQ rather than recklessly setting the heartbeat value to a large value.
Perhaps RabbitMQ forcibly terminates the client's connection if the client does not send packets for about 3 times the heartbeat value or if there is no response to the heartbeat.
Therefore, in order for a client program to maintain a connection with RabbitMQ, one of the following two conditions must be satisfied.
- The client must send a response to the heartbeat sent by RabbitMQ within the heartbeat value .
- Clients must send messages within the heartbeat value (up to 3 times).
Recognize disconnection and reconnect MQ
import pika import sys import time import os credentials = pika.PlainCredentials(username = "guest", password="guest") connection = None channel = None def connectMQ(): global connection, channel connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1", port=5672 , heartbeat=60)) channel = connection.channel() def reconnect(): connectMQ() index = 0 def sample_pub(): global index ex = 'sample_ex' key = ['rk_A', 'rk_B'] msg = 'Hello World' print("Send message to exchange ...[%d]"%index) for n in range(2): try: channel.basic_publish(ex, key[n], msg.encode()) except pika.exceptions.AMQPConnectionError as e: print("[%d] Connection is closed [pika.exceptions.AMQPConnectionError]"%(n)) reconnect() print("==== resend ====") channel.basic_publish(ex, key[n], msg.encode()) connectMQ() print("Wait for disconnection ...") time.sleep(4 * 60) #After 4 minutes, the connection might be broken. while True: sample_pub() time.sleep(5) if index == 2: break index += 1 print("Test end")
<simple_reconnect.py>
The Python program above handles exceptions that occur in the basic_publish function. If the connection is lost, it tries to reconnect again and transmits unsent packets again.
[root@localhost rabbit]# python3 simple_reconnect.py Wait for disconnection ... Send message to exchange ...[1] [0] Connection is closed [pika.exceptions.AMQPConnectionError] ==== resend ==== Send message to exchange ...[2] Test end
If you run the Python program, the connection is disconnected after about 3 minutes, and an exception occurs when you call the basic_publish function to send a message after 4 minutes. In the exception handling process, it can be reconnected and resent.
If you check Rabbit MQ, you can see that two messages are normally received by QA and QB.
Tips : The way to check connection termination in BlockingConnection is when calling a function that sends or receives a packet. In the example above, the disconnection through the basic_publish function is detected through an exception.
Keeping MQ Connection Using Heartbeat Responses
The function that handles responses to messages received from RabbitMQ including heartbeats in pika is respond_heartbeat.
import pika import sys import time import os import threading credentials = pika.PlainCredentials(username = "guest", password="guest") connection = None channel = None def connectMQ(): global connection, channel connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1", port=5672 , heartbeat=60)) channel = connection.channel() def reconnect(): connectMQ() def do_heartbeat(): while True: time.sleep(30) print("heartbeat response") connection.process_data_events() connectMQ() t = threading.Thread(target=do_heartbeat, args=()) t.start() while True: time.sleep(1) print("Test end")
<keep_conn1.py>
The example above calls the process_data_events function every 30 seconds. The process_data_events function even processes responses to heartbeats sent by RabbitMQ. So the MQ connection now doesn't drop after 3 minutes.
However, our application needs to send messages at any time. There are a few things to be aware of when using pika.
Caution: One thing to note is that pika's basic_publish and respond_heartbeat functions are not threadsafe. Therefore, care must be taken not to call it from two or more threads at the same time.
Keeping MQ Connection and Thread Safe
As I just said, when using pika, the process_data_events and basic_publish functions should not be used simultaneously in two or more threads. Therefore, you must use a mutex or use pika.BlockingConnection.add_callback_threadsafe().
Using mutex : Not recommanded
The following is an example of preventing the concurrent execution of add_callback_threadsafe and basic_publish functions using mutex. This is an example that sends a message every 0.1 second and also handles a heartbeat response every 30 seconds. While messages are being sent, the connection is not disconnected even if a heartbeat response is not sent.
import pika import sys import time import os import threading credentials = pika.PlainCredentials(username = "guest", password="guest") connection = None channel = None mtx = threading.Lock() def connectMQ(): global connection, channel connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1", port=5672 , heartbeat=60)) channel = connection.channel() def reconnect(): connectMQ() def do_heartbeat(): while True: time.sleep(30) print("heartbeat response") mtx.acquire() connection.process_data_events() mtx.release() index = 0 def sample_pub(): global index ex = 'sample_ex' key = ['rk_A', 'rk_B'] msg = 'Hello World' #print("Send message to exchange ...[%d]"%index) mtx.acquire() for n in range(2): try: channel.basic_publish(ex, key[n], msg.encode()) except pika.exceptions.AMQPConnectionError as e: print("[%d] Connection is closed [pika.exceptions.AMQPConnectionError]"%(n)) reconnect() print("==== resend ====") channel.basic_publish(ex, key[n], msg.encode()) mtx.release() connectMQ() t = threading.Thread(target=do_heartbeat, args=()) t.start() while True: sample_pub() index += 1 time.sleep(0.01) print("Test end")
<keep_conn2.py>
The above code using mutex works fine. However, the above method is not recommended in pika official documentation. Therefore, it may not work properly in future versions.
The pika documentation explains:
add_callback_threadsafe
(callback)[source]Requests a call to the given function as soon as possible in the context of this connection’s thread.
NOTE: This is the only thread-safe method in BlockingConnection. All other manipulations of BlockingConnection must be performed from the connection’s thread.
NOTE: the callbacks are dispatched only in the scope of specially-designated methods: see BlockingConnection.process_data_events() and BlockingChannel.start_consuming().
For example, a thread may request a call to the BlockingChannel.basic_ack method of a BlockingConnection that is running in a different thread via:
connection.add_callback_threadsafe(
functools.partial(channel.basic_ack, delivery_tag=...))
Using add_callback_threadsafe: Recommanded
Finally, this is the method recommended by pika. To process MQ messages in a thread-safe manner when sending or receiving MQ messages from two or more threads, add_callback_threadsafe function is used.
Now, you don't need to use mutex anymore, just call basic_publish and process_data_events functions through add_callback_threadsafe.
import pika import sys import time import os import threading import functools credentials = pika.PlainCredentials(username = "guest", password="guest") connection = None channel = None def connectMQ(): global connection, channel connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1", port=5672 , heartbeat=60)) channel = connection.channel() def reconnect(): connectMQ() def do_heartbeat(): while True: #time.sleep(0.0001) print("heartbeat response") connection.add_callback_threadsafe(lambda: connection.process_data_events()) index = 0 def sample_pub(): global index ex = 'sample_ex' key = ['rk_A', 'rk_B'] msg = 'Hello World' #print("Send message to exchange ...[%d]"%index) for n in range(2): try: connection.add_callback_threadsafe(lambda: channel.basic_publish(ex, key[n], msg.encode())) except pika.exceptions.AMQPConnectionError as e: print("[%d] Connection is closed [pika.exceptions.AMQPConnectionError]"%(n)) reconnect() print("==== resend ====") connection.add_callback_threadsafe(lambda: channel.basic_publish(ex, key[n], msg.encode())) connectMQ() t = threading.Thread(target=do_heartbeat, args=()) t.start() while True: sample_pub() index += 1 #time.sleep(0.01) print("Test end")
<keep_conn3.py>
I intentionally didn't use the sleep function in the above code to see if 2 threads were causing problems with each other. Therefore, the two threads compete to call the basic_publish and process_data_events functions using add_callback_threadsafe.
Note : If process_data_events and basic_publish are called without add_callback_threadsafe in the above code, an error will surely occur.
Most importantly, functions like basic_publish and process_data_events must run in the context of the thread that created the MQ connection, since pika functions are not safe in a multi-threaded environment.
Wrapping up
If you create a daemon program using RabbitMQ pika, you may experience problems maintaining connections. It is important to keep the connection as unbroken as possible and to restore the broken connection again. Using the example of a publisher using BlockingConnection, we learned how to maintain and restore a connection in a multi-threaded environment.
댓글
댓글 쓰기