RabbitMQ - C++ Client #2 : Basic "Hello World" example of RabbitMQ

 The https://www.rabbitmq.com/getstarted.html page provides examples for developing RabbitMQ applications using various languages.


Unfortunately, C/C++ examples are not provided. Several examples on this page will be implemented using the SimpleAmqpClient introduced in the previous article.

I will do a port based on the python code. P, C are the following terms.

  • P: producer who creates the message
  • C: consumer who comsumes message

Hello World



Sending(Producer)

This python creates queue "hello" at runtime and sends a message to the queue. As mentioned in the previous article, this method is not recommended for actual application development. It is recommended to create a queue in advance on the management page if possible.


For reference, the default parameters of the queue_declare function are as follows.

queue_declare(queuepassive=Falsedurable=Falseexclusive=Falseauto_delete=Falsearguments=Nonecallback=None)[source]


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.150.128'))
channel = connection.channel()
channel.queue_declare(queue='hello') #If queue("Hello") does not exist, it creates the queue
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')

<sender.py>

If you run the sender.py and check the queue on the management page, a hello queue is created and a message comes in.


Receiving(Consumer)

This Python code pulls out messages from the hello queue. 

import pika, sys, os

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.150.128'))
    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)


If you run this python code, you can see that messages are being pulled from the hello queue as follows. The program continues to wait in blocking mode until the user presses Ctrl+C, pulling messages from the queue as they come in. Therefore, when sender.py is executed again, the message is retrieved once more.

python receive.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'Hello World!'
 [x] Received b'Hello World!'
Interrupted


Hello World - C++

Sending(Producer)

The function to create a queue in SimpleAmqpClient is Channel::DeclareQueue. The description of this function is as follows. Parameters were put in to match the queue_declare function used in the Python code.

std::string AmqpClient::Channel::DeclareQueue(const std::string &queue_name, bool passive = false, bool durable = false, bool exclusive = true, bool auto_delete = true)
+1 overload

Declare a queue Creates a queue on the AMQP broker if it does not already exist.

Parameters:
queue_name – The desired name of the queue. If this is an empty string, the broker will generate a queue name that this method will return.
passive – Indicated how the broker should react if the queue does not exist. The broker will raise an error if the queue doesn't already exist and passive is `true`. With passive `false` (the default), the queue gets created automatically, if needed.
durable – Indicates whether the exchange is durable - e.g., will it survive a broker restart.
exclusive – Indicates that only client can use the queue. Defaults to true. An exclusive queue is deleted when the connection is closed.
auto_delete – the queue will be deleted after at least one exchange has been bound to it, then has been unbound

Returns:
The name of the queue created on the broker. Used mostly when the broker is asked to create a unique queue by not providing a queue name.



#include <iostream>
#include <SimpleAmqpClient/SimpleAmqpClient.h>

using namespace std;
using namespace AmqpClient;

/*
All pointers are boost's smart pointers, so if the "ptr_t" variable excapes the scope, all the memories are freed and the file descripters are closed automatically.
*/
int main(int argc, char * argv[])
{
    Channel::ptr_t channel = Channel::Create("192.168.150.128");
    channel->DeclareQueue("hello", false, false, false, false);

    try{
        BasicMessage::ptr_t msg = BasicMessage::Create();
        msg->Body((string)"Hello World!");
channel->BasicPublish("", "hello", msg); } catch(MessageReturnedException &e){ std::cout << "Message delivery error: " << e.what() << std::endl; } return 0; }

<sender.cpp>

You can build it with the following command. Link the RabbitMQ library using the -lrabbitmq -lSimpleAmqpClient link option.

g++ sender.cpp -lrabbitmq -lSimpleAmqpClient -o sender


Receiving(Consumer)

Unlike Python, BasicConsume of SimpleAmqpClient uses timeout method instead of callback method. The description of this function is as follows. Parameters were put in to match the basic_consume function used in the Python code.

The parameter to look at carefully is the fourth no_ack. If this value is true, SimpleAmqpClient sends an ack immediately upon receiving a message. If this value is false, a direct ack must be sent to receive the next message. BasicAck is a function that manually sends an ack in SimpleAmqpClient. 

Therefore, the no_ack parameter of SimpleAmqpClient's BasicConsume function is exactly the same as auto_ack in the Python basic_consume function.

sstd::string AmqpClient::Channel::BasicConsume(const std::string &queue, const std::string &consumer_tag = "", bool no_local = true, bool no_ack = true, bool exclusive = true, uint16_t message_prefetch_count = (uint16_t)1U)
+1 overload Starts consuming Basic messages on a queue Subscribes as a consumer to a queue, so all future messages on a queue will be Basic.Delivered Parameters: queue – The name of the queue to subscribe to. consumer_tag – The name of the consumer. This is used to do operations with a consumer. no_local – Defaults to true no_ack – If `true`, ack'ing the message is automatically done when the message is delivered. Defaults to `true` (message does not have to be ack'ed). exclusive – Means only this consumer can access the queue. message_prefetch_count – Number of unacked messages the broker will deliver. Setting this to more than 1 will allow the broker to deliver messages while a current message is being processed. A value of 0 means no limit. This option is ignored if `no_ack = true`. Returns: the consumer tag


#include <stdio.h>
#include <iostream>
#include <unistd.h>
#include <signal.h>

#include <SimpleAmqpClient/SimpleAmqpClient.h>

using namespace std;
using namespace AmqpClient;

bool g_loop = true;
void sig_handler(int signo)
{
    if (SIGINT == signo) g_loop = false;
}

int main(int argc, char * argv[])
{
    struct sigaction act;
    act.sa_handler = sig_handler;
    sigaction(SIGINT, &act, NULL);

    Channel::ptr_t channel = Channel::Create("192.168.150.128");
    channel->DeclareQueue("hello", false, false, false, false);
    channel->BasicConsume("hello", "hello", true, true, false);
    //channel->BasicConsume("hello","hello");
    cout << " [*] Waiting for messages. To exit press Ctrl + C" <<endl;
    while(g_loop){
        try{
            Envelope::ptr_t envelope;
            bool bflag = channel->BasicConsumeMessage("hello", envelope, 1000);
            if(bflag == false){  //time out
                fprintf(stderr, ".");
                continue;
            }
            
            fprintf(stderr, " [x] Received %s\n",envelope->Message()->Body().c_str() );
        }
        catch(MessageReturnedException &e){
            fprintf(stderr, " Message receive error\n" );
        }
    }
    cout << "\n Goob Bye" << endl;    
    return 0;
}

<receive.cpp>


If you want to send Ack directly, you can process it as follows. But in most cases, there's not much reason to do this.

#include <stdio.h>
#include <iostream>
#include <unistd.h>
#include <signal.h>

#include <SimpleAmqpClient/SimpleAmqpClient.h>

using namespace std;
using namespace AmqpClient;

bool g_loop = true;
void sig_handler(int signo)
{
    if (SIGINT == signo) g_loop = false;
}

int main(int argc, char * argv[])
{
    struct sigaction act;
    act.sa_handler = sig_handler;
    sigaction(SIGINT, &act, NULL);

    Channel::ptr_t channel = Channel::Create("192.168.150.128");
    channel->DeclareQueue("hello", false, false, false, false);
    channel->BasicConsume("hello", "hello", true, false, false);
    //channel->BasicConsume("hello","hello");
    cout << " [*] Waiting for messages. To exit press Ctrl + C" <<endl;
    while(g_loop){
        try{
            Envelope::ptr_t envelope;
            bool bflag = channel->BasicConsumeMessage("hello", envelope, 1000);
            if(bflag == false){  //time out
                fprintf(stderr, ".");
                continue;
            }
            
            fprintf(stderr, " [x] Received %s\n",envelope->Message()->Body().c_str() );
            
            Envelope::DeliveryInfo info;
            info.delivery_tag = envelope->DeliveryTag();
            info.delivery_channel = envelope->DeliveryChannel();
            channel->BasicAck(info);
        }
        catch(MessageReturnedException &e){
            fprintf(stderr, " Message receive error\n" );
        }
    }
    cout << "\n Goob Bye" << endl;    
    return 0;
}

<manually send ack>


You can build it with the following command. Link the RabbitMQ library using the -lrabbitmq -lSimpleAmqpClient link option.

g++ receive.cpp -lrabbitmq -lSimpleAmqpClient -o receive


Wrapping up

I implemented the first example provided by RabbitMQ in C++. Next, I will implement the second example Work Queues in C++.

The source code can be downloaded from my Github.




댓글

이 블로그의 인기 게시물

MQTT - C/C++ Client

RabbitMQ - C++ Client #1 : Installing C/C++ Libraries

C/C++ - Everything about time, date