RabbitMQ - C++ Client #3 : Characteristics and usage of Message Queues
This example does not change much from the previous Hello World. However, consider the case where there are multiple Consumers. When there are multiple consumers, it is a very effective method when message processing takes a lot of time. By distributing processing in multiple consumers, message processing capability can be increased.
Also, if message processing takes a lot of time, it is better to directly process Ack transmission. How to directly send Ack was explained in the previous article RabbitMQ - C++ Client #2.
Work Queues
The example code is slightly different from the home page. In the homepage, the program to send one message was run multiple times, but I sent it several times in one program. So there is no need to run it multiple times.
Sending(Producer)
This python creates queue "task_queue" at runtime and sends 10 messages to the queue.
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='192.168.150.128')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) for x in range(10): message = "Hello World! [%d]"%(x) channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE, )) print(" [x] Sent %r" % message) connection.close()
<new_task.py>
Receiving(Consumer)
This Python code pulls out messages from the "task_queue" queue repeatedly.
There are three things to look out for in this code.
In the basic_consume function, auto_ack was not set separately. This value defaults to False, as can be seen in the following definition. Therefore, if you do not set this value to True, you must send an ack directly from your code.
basic_consume
(queue, on_message_callback, auto_ack=False, exclusive=False, consumer_tag=None, arguments=None, callback=None)[source]
You can see that the basic_ack function, which directly sends ack in the callback function, has been implemented.
And the basic_qos function. By setting prefetch_count=1 in this function, it informs that new messages will not be received until Ack is sent. Therefore, RabbitMQ broker does not deliver new messages to the consumer until the consumer sends Ack.
To use this option (prefetch_count=1), you must set auto_ack=False. If auto_ack is True, this QoS setting is ignored.
And start_consuming function processes I/O events and dispatches timers and basic_consume callbacks until all consumers are cancelled.
import pika, sys, os import time connection = pika.BlockingConnection( pika.ConnectionParameters(host='192.168.150.128')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body.decode()) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() channel.close()
<worker.py>
First , run 2 consumers(worker.py) and then run the producer(new_task.py). If there are no errors, you should get a result similar to the following.
If there are no errors, you should get a result similar to the following.
python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World! [0]' [x] Done [x] Received 'Hello World! [2]' [x] Done [x] Received 'Hello World! [5]' [x] Done [x] Received 'Hello World! [6]' [x] Done [x] Received 'Hello World! [8]' [x] Done
<first consumer worker.py>
python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World! [1]' [x] Done [x] Received 'Hello World! [3]' [x] Done [x] Received 'Hello World! [4]' [x] Done [x] Received 'Hello World! [7]' [x] Done [x] Received 'Hello World! [9]' [x] Done
<second consumer worker.py>
python new_task.py [x] Sent 'Hello World! [0]' [x] Sent 'Hello World! [1]' [x] Sent 'Hello World! [2]' [x] Sent 'Hello World! [3]' [x] Sent 'Hello World! [4]' [x] Sent 'Hello World! [5]' [x] Sent 'Hello World! [6]' [x] Sent 'Hello World! [7]' [x] Sent 'Hello World! [8]' [x] Sent 'Hello World! [9]'
<producer new_task.py>
The above result can be expressed as the following figure.
The 10 messages created by new_task.py are distributed to consumers in a round-robin manner. However, there is one condition. The consumer must be in the state of sending an ack. If a RabbitMQ broker tries to send a message to a consumer in the round-robin order, and this consumer has not sent an ack for receiving the previous message, the message is delivered to the next broker.
To check this, I will change worker.py to worker2.py and set different ack sending times.
import pika, sys, os import time work_time = int(sys.argv[1]) connection = pika.BlockingConnection( pika.ConnectionParameters(host='192.168.150.128')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body.decode()) time.sleep(work_time) print("sleep %d seconds"%(work_time)) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() channel.close()
<worker2.py>
python worker2.py 2 [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World! [0]' sleep 2 seconds [x] Done [x] Received 'Hello World! [2]' sleep 2 seconds [x] Done [x] Received 'Hello World! [3]' sleep 2 seconds [x] Done [x] Received 'Hello World! [5]' sleep 2 seconds [x] Done [x] Received 'Hello World! [6]' sleep 2 seconds [x] Done [x] Received 'Hello World! [7]' sleep 2 seconds [x] Done [x] Received 'Hello World! [9]' sleep 2 seconds [x] Done
<first consumer worker2.py>
python worker2.py 6 [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World! [1]' sleep 6 seconds [x] Done [x] Received 'Hello World! [4]' sleep 6 seconds [x] Done [x] Received 'Hello World! [8]' sleep 6 seconds [x] Done
<second consumer worker2.py>
python new_task.py [x] Sent 'Hello World! [0]' [x] Sent 'Hello World! [1]' [x] Sent 'Hello World! [2]' [x] Sent 'Hello World! [3]' [x] Sent 'Hello World! [4]' [x] Sent 'Hello World! [5]' [x] Sent 'Hello World! [6]' [x] Sent 'Hello World! [7]' [x] Sent 'Hello World! [8]' [x] Sent 'Hello World! [9]'
<producer new_task.py>
As the processing time in the consumer increases, the time to send an ack is delayed, and the frequency of the consumer being excluded from the round robin order to receive messages increases. Therefore, as shown in the figure above, the message processing time increases and the number of messages that can be received also decreases.
Work Queues - C++
#include <iostream> #include <SimpleAmqpClient/SimpleAmqpClient.h> using namespace std; using namespace AmqpClient; /* All pointers are boost's smart pointers, so if the "ptr_t" variable excapes the scope, all the memories are freed and the file descripters are closed automatically. */ int main(int argc, char * argv[]) { Channel::ptr_t channel = Channel::Create("192.168.150.128"); //DeclareQueue(const std::string & queue_name, bool passive = false, bool durable = false, bool exclusive = true, bool auto_delete = true) channel->DeclareQueue("task_queue", false, true, false, false); char szmsg[128]; try{ for(int x = 0; x < 10; x++){ sprintf(szmsg, "Hello World! [%d]", x); BasicMessage::ptr_t msg = BasicMessage::Create(); msg->Body((string)szmsg); msg->DeliveryMode(AmqpClient::BasicMessage::delivery_mode_t::dm_persistent); channel->BasicPublish("", "task_queue", msg); printf(" [x] Sent %s\n", szmsg); } } catch(MessageReturnedException &e){ std::cout << "Message delivery error: " << e.what() << std::endl; } return 0; }
<new_task.cpp>
#include <stdio.h> #include <iostream> #include <unistd.h> #include <signal.h> #include <SimpleAmqpClient/SimpleAmqpClient.h> using namespace std; using namespace AmqpClient; bool g_loop = true; void sig_handler(int signo) { if (SIGINT == signo) g_loop = false; } int main(int argc, char * argv[]) { struct sigaction act; act.sa_handler = sig_handler; sigaction(SIGINT, &act, NULL); if(argc < 2){ cout << "invalid parameter" <<endl; return 0; } int work_time = atoi(argv[1]); Channel::ptr_t channel = Channel::Create("192.168.150.128"); channel->DeclareQueue("task_queue", false, true, false, false); //BasicConsume(const std::string &queue, const std::string &consumer_tag = "", bool no_local = true, bool no_ack = true, bool exclusive = true, uint16_t message_prefetch_count = (uint16_t)1U) channel->BasicConsume("task_queue", "task_queue", true, false, false); channel->BasicQos("task_queue", 1); //channel->BasicConsume("hello","hello"); cout << " [*] Waiting for messages. To exit press Ctrl + C" <<endl; while(g_loop){ try{ Envelope::ptr_t envelope; bool bflag = channel->BasicConsumeMessage("task_queue", envelope, 1000); if(bflag == false){ //time out //fprintf(stderr, "."); continue; } fprintf(stderr, " [x] Received %s\n",envelope->Message()->Body().c_str() ); sleep(work_time); fprintf(stderr, "sleep %d seconds\n", work_time); fprintf(stderr, " [x] Done\n"); Envelope::DeliveryInfo info; info.delivery_tag = envelope->DeliveryTag(); info.delivery_channel = envelope->DeliveryChannel(); channel->BasicAck(info); } catch(MessageReturnedException &e){ fprintf(stderr, " Message receive error\n" ); } } cout << "\n Goob Bye" << endl; return 0; }
<worker2.cpp>
Compile with the following command.
g++ new_task.cpp -lrabbitmq -lSimpleAmqpClient -o new_tas
g++ worker2.cpp -lrabbitmq -lSimpleAmqpClient -o worker2
If you test it the same way as the Python example, you will get the same result.
The source code can be downloaded from my Github.
댓글
댓글 쓰기