RabbitMQ -Python Client #6 : MQ Connection Management

 Applications that need to maintain persistent MQ connections should take special care to manage MQ connections. I recently created a daemon application that sends messages to Rabbit MQ, but I remember having a hard time with connection management.

I have prepared several guides for managing Rabbit MQ connections. In this article, we will use Python instead of C/C++. Therefore, we will use pika, a Python module for Rabbit MQ.


Preparations

First, Exchange and Queue are created in Rabbit MQ in advance as follows.




If you want to know more about Queue and Exchange, please refer to the following pages in advance.

According to the settings above, if a message is transmitted using the routing key rk_A to Exchange sample_ex, it is stored in queue QA, and if a message is transmitted using routing key rk_B, it is stored in queue QB.

Application for one-time sending of messages


Before looking at MQ connection management in earnest, let's check that the messages we sent are normally stored in queues QA and QB through Exchange.

import pika
import pika.exceptions
import time

credentials = pika.PlainCredentials(username = "guest", password="guest")
connection = None
channel = None

def connectMQ():
    global connection, channel
    #connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1", port=5672 , heartbeat=60, tcp_options={'TCP_KEEPIDLE':10}))
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1", port=5672))
    channel = connection.channel()

def sample_pub():
    ex = 'sample_ex'
    key = ['rk_A', 'rk_B']
    msg = 'Hello World'
    for n in range(2):
        channel.basic_publish(ex, key[n], msg.encode())


connectMQ()
sample_pub()

<simple_pub.py>

When this Python code is executed, messages are sent to QA and QB one by one, as shown in the following figure.




RabbitMQ Connection Timeout


The basic environment configuration is complete. Now let's think about how to maintain the connection.
First of all, set the heartbeat value of ConnectionParameters to 60 and check without sending any data after connection.

This python script does nothing after establishing a connection with MQ and just waits.

The heartbeat value of ConnectionParameters is not specified, but in this case, the default value of 60 seconds is used.


import pika
import sys
import time
import os

credentials = pika.PlainCredentials(username = "guest", password="guest")
connection = None
channel = None

def connectMQ():
    global connection, channel
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1", port=5672))
    channel = connection.channel()

def sample_pub():
    ex = 'sample_ex'
    key = ['rk_A', 'rk_B']
    msg = 'Hello World'
    for n in range(2):
        channel.basic_publish(ex, key[n], msg.encode())


connectMQ()


while True:
    time.sleep(1)

<simple_conn.py>


If you run this Python code and look at Connections on the Rabbit MQ management page, you will see that a new connection has been created as follows. Now let's observe this connection for a moment. You can see that the connection disappears after about 3 minutes.


Connection monitoring can be checked by using the watch command and the netstat command together as follows.

[root@localhost rabbit]# watch -n 5 "netstat -ano |grep 5672"


Every 5.0s: netstat -ano |grep 5672                                 localhost.localdomain: Sat May 20 14:52:27 2023

tcp        0	  0 0.0.0.0:25672           0.0.0.0:*               LISTEN	off (0.00/0/0)
tcp        0	  0 0.0.0.0:15672           0.0.0.0:*               LISTEN	off (0.00/0/0)
tcp        0	  0 127.0.0.1:44892         127.0.0.1:5672          ESTABLISHED off (0.00/0/0)
tcp        0	  0 192.168.150.146:15672   192.168.150.1:19828     ESTABLISHED off (0.00/0/0)
tcp6	   0	  0 :::5672                 :::*                    LISTEN	off (0.00/0/0)
tcp6	   0	  0 127.0.0.1:5672          127.0.0.1:44892         ESTABLISHED off (0.00/0/0)


Similar to the RabbitMQ management page, you can see that the connection disappears after about 3 minutes. Therefore, if the heartbeat value is set high, the connection time can be maintained for a long time. However, it is a good practice to send an appropriate response to the heartbeat sent by RabbitMQ rather than recklessly setting the heartbeat value to a large value.

Perhaps RabbitMQ forcibly terminates the client's connection if the client does not send packets for about 3 times the heartbeat value or if there is no response to the heartbeat.


Therefore, in order for a client program to maintain a connection with RabbitMQ, one of the following two conditions must be satisfied.

  • The client must send a response to the heartbeat sent by RabbitMQ within the heartbeat value .
  • Clients must send messages within the heartbeat value (up to 3 times).



Recognize disconnection and reconnect MQ

An exception occurs when basic_publish, a function that sends messages, is used on a disconnected connection. Therefore, if the connection is lost by using an exception, you can try to reconnect.

import pika
import sys
import time
import os

credentials = pika.PlainCredentials(username = "guest", password="guest")
connection = None
channel = None

def connectMQ():
    global connection, channel
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1", port=5672 , heartbeat=60))
    channel = connection.channel()

def reconnect():
    connectMQ()
    
index = 0
def sample_pub():
    global index
    ex = 'sample_ex'
    key = ['rk_A', 'rk_B']
    msg = 'Hello World'
    print("Send message to exchange ...[%d]"%index)
    for n in range(2):
        try:
            channel.basic_publish(ex, key[n], msg.encode())
        except pika.exceptions.AMQPConnectionError as e:             
            print("[%d] Connection is closed [pika.exceptions.AMQPConnectionError]"%(n))
            reconnect()
            print("==== resend ====")
            channel.basic_publish(ex, key[n], msg.encode())


connectMQ()
print("Wait for disconnection ...")
time.sleep(4 * 60)    #After 4 minutes, the connection might be broken.

while True:
    sample_pub()
    time.sleep(5)
    if index == 2: break
    index += 1

print("Test end")

<simple_reconnect.py>

The Python program above handles exceptions that occur in the basic_publish function. If the connection is lost, it tries to reconnect again and transmits unsent packets again.


[root@localhost rabbit]# python3 simple_reconnect.py 
Wait for disconnection ...
Send message to exchange ...[1]
[0] Connection is closed [pika.exceptions.AMQPConnectionError]
==== resend ====
Send message to exchange ...[2]
Test end


If you run the Python program, the connection is disconnected after about 3 minutes, and an exception occurs when you call the basic_publish function to send a message after 4 minutes. In the exception handling process, it can be reconnected and resent. 

If you check Rabbit MQ, you can see that two messages are normally received by QA and QB.



Tips : The way to check connection termination in BlockingConnection is when calling a function that sends or receives a packet. In the example above, the disconnection through the basic_publish function is detected through an exception.


Keeping MQ Connection Using Heartbeat Responses

As in the example above, it is important to reconnect and resume operation when the connection is disconnected, but it is also important to periodically send heartbeat responses to prevent disconnection if possible.

The function that handles responses to messages received from RabbitMQ including heartbeats in pika is respond_heartbeat.

import pika
import sys
import time
import os
import threading

credentials = pika.PlainCredentials(username = "guest", password="guest")
connection = None
channel = None

def connectMQ():
    global connection, channel
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1", port=5672 , heartbeat=60))
    channel = connection.channel()

def reconnect():
    connectMQ()


def do_heartbeat():
    while True:
        time.sleep(30)
        print("heartbeat response")
        connection.process_data_events()
        

connectMQ()

t = threading.Thread(target=do_heartbeat, args=())
t.start()

while True:
    time.sleep(1)

print("Test end")

<keep_conn1.py>

The example above calls the process_data_events function every 30 seconds. The process_data_events function even processes responses to heartbeats sent by RabbitMQ. So the MQ connection now doesn't drop after 3 minutes.

However, our application needs to send messages at any time. There are a few things to be aware of when using pika.

Caution: One thing to note is that pika's basic_publish and respond_heartbeat functions are not threadsafe. Therefore, care must be taken not to call it from two or more threads at the same time.


Keeping MQ Connection and Thread Safe

As I just said, when using pika, the process_data_events and basic_publish functions should not be used simultaneously in two or more threads. Therefore, you must use a mutex or use pika.BlockingConnection.add_callback_threadsafe().


Using mutex : Not recommanded

The following is an example of preventing the concurrent execution of add_callback_threadsafe and basic_publish functions using mutex.  This is an example that sends a message every 0.1 second and also handles a heartbeat response every 30 seconds. While messages are being sent, the connection is not disconnected even if a heartbeat response is not sent.


import pika
import sys
import time
import os
import threading

credentials = pika.PlainCredentials(username = "guest", password="guest")
connection = None
channel = None
mtx = threading.Lock()

def connectMQ():
    global connection, channel
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1", port=5672 , heartbeat=60))
    channel = connection.channel()

def reconnect():
    connectMQ()


def do_heartbeat():
    while True:
        time.sleep(30)
        print("heartbeat response")
        mtx.acquire()
        connection.process_data_events()
        mtx.release()
        
index = 0    
def sample_pub():
    global index
    ex = 'sample_ex'
    key = ['rk_A', 'rk_B']
    msg = 'Hello World'
    #print("Send message to exchange ...[%d]"%index)
    mtx.acquire()
    for n in range(2):
        try:
            channel.basic_publish(ex, key[n], msg.encode())
        except pika.exceptions.AMQPConnectionError as e:             
            print("[%d] Connection is closed [pika.exceptions.AMQPConnectionError]"%(n))
            reconnect()
            print("==== resend ====")
            channel.basic_publish(ex, key[n], msg.encode())
    mtx.release()

connectMQ()

t = threading.Thread(target=do_heartbeat, args=())
t.start()

while True:
    sample_pub()
    index += 1
    time.sleep(0.01)

print("Test end")

<keep_conn2.py>


The above code using mutex works fine. However, the above method is not recommended in pika official documentation. Therefore, it may not work properly in future versions.


The pika documentation explains:

add_callback_threadsafe(callback)[source]

Requests a call to the given function as soon as possible in the context of this connection’s thread.

NOTE: This is the only thread-safe method in BlockingConnection. All other manipulations of BlockingConnection must be performed from the connection’s thread.

NOTE: the callbacks are dispatched only in the scope of specially-designated methods: see BlockingConnection.process_data_events() and BlockingChannel.start_consuming().

For example, a thread may request a call to the BlockingChannel.basic_ack method of a BlockingConnection that is running in a different thread via:

connection.add_callback_threadsafe(
    functools.partial(channel.basic_ack, delivery_tag=...))


Using add_callback_threadsafe: Recommanded

Finally, this is the method recommended by pika. To process MQ messages in a thread-safe manner when sending or receiving MQ messages from two or more threads, add_callback_threadsafe function is used.

Now, you don't need to use mutex anymore, just call basic_publish and process_data_events functions through add_callback_threadsafe.

import pika
import sys
import time
import os
import threading
import functools

credentials = pika.PlainCredentials(username = "guest", password="guest")
connection = None
channel = None

def connectMQ():
    global connection, channel
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1", port=5672 , heartbeat=60))
    channel = connection.channel()

def reconnect():
    connectMQ()

def do_heartbeat():
    while True:
        #time.sleep(0.0001)
        print("heartbeat response")
        connection.add_callback_threadsafe(lambda: connection.process_data_events())    

        
index = 0    
def sample_pub():
    global index
    ex = 'sample_ex'
    key = ['rk_A', 'rk_B']
    msg = 'Hello World'
    #print("Send message to exchange ...[%d]"%index)
    for n in range(2):
        try:
            connection.add_callback_threadsafe(lambda: channel.basic_publish(ex, key[n], msg.encode()))    
        except pika.exceptions.AMQPConnectionError as e:             
            print("[%d] Connection is closed [pika.exceptions.AMQPConnectionError]"%(n))
            reconnect()
            print("==== resend ====")
            connection.add_callback_threadsafe(lambda: channel.basic_publish(ex, key[n], msg.encode()))    
       

connectMQ()

t = threading.Thread(target=do_heartbeat, args=())
t.start()

while True:
    sample_pub()
    index += 1
    #time.sleep(0.01)

print("Test end")

<keep_conn3.py>

I intentionally didn't use the sleep function in the above code to see if 2 threads were causing problems with each other. Therefore, the two threads compete to call the basic_publish and process_data_events functions using add_callback_threadsafe.

Note : If process_data_events and basic_publish are called without add_callback_threadsafe in the above code, an error will surely occur.

Most importantly, functions like basic_publish and process_data_events must run in the context of the thread that created the MQ connection, since pika functions are not safe in a multi-threaded environment.


Wrapping up

If you create a daemon program using RabbitMQ pika, you may experience problems maintaining connections. It is important to keep the connection as unbroken as possible and to restore the broken connection again. Using the example of a publisher using BlockingConnection, we learned how to maintain and restore a connection in a multi-threaded environment.


댓글

이 블로그의 인기 게시물

MQTT - C/C++ Client

RabbitMQ - C++ Client #1 : Installing C/C++ Libraries

C/C++ - Everything about time, date