RabbitMQ - C++ Client (Exchange) #4 : How to use the 4 types of Exchange

 This example is the 3rd example Publish/Subscribe on the https://www.rabbitmq.com/getstarted.html page.

Some of the content is quoted from "RabbitMQ in Depth" published in 2018 by Manning Press.


Publish/Subscribe


The difference from the previous example is that X (Exchange) exists between Publisher and Subscriber.

Most real-world projects use Exchange to pass messages rather than putting them directly into a queue. Exchange can simultaneously put messages on multiple bound queues. In addition, it provides flexibility to increase or change queues bound to Exchange from the Rabbit MQ management page without modifying the Publisher program.


Subscribers in the originally provided source code create queues and bind to Exchange at runtime. But I don't like it this way. I will create Exchanges and Queues in the admin page beforehand and then bind them.


<Pre-created Exchanges and Queues>



emit_log(Producer)


#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='192.168.150.128'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

<emit_log.py>


Subscribers are best received from queues rather than exchanges in most cases. So I'll tweak the example code to not use Exchange.

receive_logs(Subscriber)

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

<receive_logs.py>


$ python emit_log.py
 [x] Sent 'info: Hello World!'


$ python receive_logs.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'info: Hello World!'
 [x] Done
 

Using a partially modified Python program, you can check that the message delivered to Exchange is delivered to the bound queue and the Subscriber takes it.

I didn't look at the logB queue, but I'm sure the same message is stored there.


Publish/Subscribe - C++


emit_log(Producer)

#include <iostream>
#include <SimpleAmqpClient/SimpleAmqpClient.h>

using namespace std;
using namespace AmqpClient;

/*
All pointers are boost's smart pointers, so if the "ptr_t" variable excapes the scope, all the memories are freed and the file descripters are closed automatically.
*/
int main(int argc, char * argv[])
{
    Channel::ptr_t channel = Channel::Create("192.168.150.128");
    //void DeclareExchange(const string &exchange_name, const string &exchange_type = Channel::EXCHANGE_TYPE_DIRECT, bool passive = false, bool durable = false, bool auto_delete = false)
    channel->DeclareExchange("logs", Channel::EXCHANGE_TYPE_FANOUT, false, true);
    try{
        BasicMessage::ptr_t msg = BasicMessage::Create();
        msg->Body((string)"Hello World!");
        channel->BasicPublish("logs", "",  msg);
        cout << "SENT to exchange[logs]:" << msg->Body() << endl;
    }
    catch(MessageReturnedException &e){
        std::cout << "Message delivery error: " << e.what() << std::endl;
    }
    return 0;
}

<emit_log.cpp>


receive_logs(Subscriber)

#include <stdio.h>
#include <iostream>
#include <SimpleAmqpClient/SimpleAmqpClient.h>

using namespace std;
using namespace AmqpClient;

int main(int argc, char * argv[])
{
    //Channel::ptr_t connection = Channel::Create("192.168.150.128", 5672, username, password, vhost, (int)frame_max);
    Channel::ptr_t connection = Channel::Create("192.168.150.128", 5672);

    connection->BasicConsume("logA", "logA");
    int timeout = 5000;
    char szmsg[1024];
    while(true){
        try{
            Envelope::ptr_t envelope;
            bool bflag = connection->BasicConsumeMessage("logA", envelope, timeout);
            if(bflag == false){  //time out
            cout << "time out => continue" << endl;
                continue;
            }
            cout << "RCV:" << envelope->Message()->Body() << endl;
        }
        catch(MessageReturnedException &e){
            std::cout << "Message receive error: " << e.what() << std::endl;
        }
    }
    return 0;
}

<receive_logs.cpp>


Compile and run the programs to get the same result.

$ ./emit_log
SENT to exchange[logs]:Hello World!
$ ./emit_log
SENT to exchange[logs]:Hello World!


$ ./receive_logs                                                  
RCV:Hello World!
time out => continue
RCV:Hello World!
time out => continue


Under the Hood (more about Exchange)

The exchange can be used in four ways.

  • Direct exchange
  • Fanout exchange
  • Topic exchange
  • Headers exchange


Direct exchange

Direct exchange is a method of forwarding messages using a routing key. Among the queues bound to Exchange, only queues using the same routing key can receive the message.

<direct exchange>


Exchanges of direct type can be designated when creating exchanges in the management screen. Since this type is the base type of Exchange, all exchanges created as a base are already of type direct .


Create direct type exchange

For testing purposes, we will create a simple direct type exchange. The logs exchange created earlier is also of the direct type.

<direct type exchange creation>


The binding of Exchange and Queue used a routing key as follows.


<binding and queue information>

Since we prepared  the exchang, queues, and bindings that can test the direct exchange, let's create a test program.


This program sends messages to the directX exchange one by one using the rk_A and rk_B routing keys.

#include <iostream>
#include <SimpleAmqpClient/SimpleAmqpClient.h>

using namespace std;
using namespace AmqpClient;

/*
All pointers are boost's smart pointers, so if the "ptr_t" variable excapes the scope, all the memories are freed and the file descripters are closed automatically.
*/
int main(int argc, char * argv[])
{
    Channel::ptr_t channel = Channel::Create("192.168.150.128");
    //void DeclareExchange(const string &exchange_name, const string &exchange_type = Channel::EXCHANGE_TYPE_DIRECT, bool passive = false, bool durable = false, bool auto_delete = false)
    channel->DeclareExchange("directX", Channel::EXCHANGE_TYPE_DIRECT, false, true);
    try{
        BasicMessage::ptr_t msg = BasicMessage::Create();
        msg->Body((string)"Hello World with routingkey:rk_A");
        channel->BasicPublish("directX", "rk_A",  msg);
        cout << "SENT to exchange[directX]:" << msg->Body() << endl;

        msg->Body((string)"Hello World with routingkey:rk_B");
        channel->BasicPublish("directX", "rk_B",  msg);
        cout << "SENT to exchange[directX]:" << msg->Body() << endl;

    }
    catch(MessageReturnedException &e){
        std::cout << "Message delivery error: " << e.what() << std::endl;
    }
    return 0;
}

<direct_publisher.cpp>


In this program, thread method is used to read messages from three queues (QA, QB, QC). Therefore, -lpthread must be added to the link option.

#include <stdio.h>
#include <iostream>
#include <pthread.h>
#include <SimpleAmqpClient/SimpleAmqpClient.h>

using namespace std;
using namespace AmqpClient;
void *consumer_thread(void *data);

const char * routing_key[] = { "rk_A","rk_A,rk_B", "rk_B"};
const char * queue[] = { "QA","QB", "QC"};

int main(int argc, char * argv[])
{
    //for testing 3 queue (QA, QB, QC), I'll make 3 threads.
    pthread_t p_thread;
	pthread_create(&p_thread, NULL, consumer_thread, (void *)0);
	pthread_detach(p_thread);

	pthread_create(&p_thread, NULL, consumer_thread, (void *)1);
	pthread_detach(p_thread);

	pthread_create(&p_thread, NULL, consumer_thread, (void *)2);
	pthread_detach(p_thread);

    while(1) sleep(1);
}

void *consumer_thread(void *data)
{
    int index = *((int*)(&data));
    printf("index:%d\n", index);
    sleep(1);
    
    Channel::ptr_t connection = Channel::Create("192.168.150.128", 5672);
    connection->BasicConsume(queue[index], queue[index]);
    int timeout = 5000;
    char szmsg[1024];
    while(true){
        try{
            Envelope::ptr_t envelope;
            bool bflag = connection->BasicConsumeMessage(queue[index], envelope, timeout);
            if(bflag == false){  //time out
            cout << "time out => continue" << endl;
                continue;
            }
            cout << "Q[" << queue[index] << "] RCV:" << envelope->Message()->Body() << endl;
        }
        catch(MessageReturnedException &e){
            std::cout << "Message receive error: " << e.what() << std::endl;
        }
    }

}

<direct_consumer.cpp>


In the QB queue, it can be seen that both messages have been received.

# g++ direct_publisher.cpp -lrabbitmq -lSimpleAmqpClient -o direct_publisher
# ./direct_publisher
SENT to exchange[directX]:Hello World with routingkey:rk_A
SENT to exchange[directX]:Hello World with routingkey:rk_B


# .g++ direct_consumer.cpp -lrabbitmq -lSimpleAmqpClient -lpthread -o direct_consumer
# ./direct_consumer
index:0
index:1
index:2
time out => continue
time out => continue
time out => continue
time out => continue
Q[QB] RCV:Hello World with routingkey:rk_A
Q[QA] RCV:Hello World with routingkey:rk_A
Q[QC] RCV:Hello World with routingkey:rk_B



Fanout exchange

Where a direct exchange allows for queues to receive targeted messages, a fanout exchange doesn’t discriminate. All messages published through a fanout exchange are delivered to all queues in the fanout exchange. This method has a slight speed advantage because RabbitMQ does not have to check the routing key. Most of the previous examples tested using this fanout queue.

<fanout exchange>

I intentionally added a routing key. However, these values will be ignored by RabbitMQ and messages will be delivered to the bound queues regardless of these values.


This program is a very partial modification of direct_publisher.cpp. The routing key is not needed, but is left to show that it is ignored by RabbitMQ.

#include <iostream>
#include <SimpleAmqpClient/SimpleAmqpClient.h>

using namespace std;
using namespace AmqpClient;

/*
All pointers are boost's smart pointers, so if the "ptr_t" variable excapes the scope, all the memories are freed and the file descripters are closed automatically.
*/
int main(int argc, char * argv[])
{
    Channel::ptr_t channel = Channel::Create("192.168.150.128");
    //void DeclareExchange(const string &exchange_name, const string &exchange_type = Channel::EXCHANGE_TYPE_DIRECT, bool passive = false, bool durable = false, bool auto_delete = false)
    channel->DeclareExchange("directX", Channel::EXCHANGE_TYPE_DIRECT, false, true);
    try{
        BasicMessage::ptr_t msg = BasicMessage::Create();
        msg->Body((string)"Hello World with routingkey:rk_A");
        channel->BasicPublish("directX", "rk_A",  msg);
        cout << "SENT to exchange[directX]:" << msg->Body() << endl;

        msg->Body((string)"Hello World with routingkey:rk_B");
        channel->BasicPublish("directX", "rk_B",  msg);
        cout << "SENT to exchange[directX]:" << msg->Body() << endl;

    }
    catch(MessageReturnedException &e){
        std::cout << "Message delivery error: " << e.what() << std::endl;
    }
    return 0;
}

<fanout_publisher.cpp>

The consumer program will use direct_consumer as it is. There is nothing to change.

You can see that there are two messages in every queue.

# g++ fanout_publisher.cpp -lrabbitmq -lSimpleAmqpClient -lpthread -o fanout_publisher
# ./fanout_publisher
SENT to exchange[fanoutX]:Hello World with routingkey:rk_A
SENT to exchange[fanoutX]:Hello World with routingkey:rk_B


# ./direct_consumer
index:0
index:1
index:2
time out => continue
time out => continue
time out => continue

time out => continue
time out => continue
time out => continue
Q[QA] RCV:Hello World with routingkey:rk_A
Q[QC] RCV:Hello World with routingkey:rk_A
Q[QB] RCV:Hello World with routingkey:rk_A
Q[QA] RCV:Hello World with routingkey:rk_B
Q[QC] RCV:Hello World with routingkey:rk_B
Q[QB] RCV:Hello World with routingkey:rk_B


Topic exchange

Like direct exchanges, topic exchanges will route messages to any queue bound with a matching routing key. But by using a period-delimited format, queues may bind to routing keys using wildcard-based pattern matching. By using the asterisk (*) and pound (#) characters, you can match specific parts of the routing key or even multiple parts at the same time. An asterisk will match all characters up to a period in the routing key, and the pound character will match all characters that follow, including any subsequent periods. 

This method is used in MQTT.





Since message routing of directory structure is possible, it is suitable for message routing with hierarchical structure.




I configured exchange like this.





#include <iostream>
#include <SimpleAmqpClient/SimpleAmqpClient.h> using namespace std; using namespace AmqpClient; /* All pointers are boost's smart pointers, so if the "ptr_t" variable excapes the scope, all the memories are freed and the file descripters are closed automatically. */ int main(int argc, char * argv[]) { Channel::ptr_t channel = Channel::Create("192.168.150.128"); //void DeclareExchange(const string &exchange_name, const string &exchange_type = Channel::EXCHANGE_TYPE_DIRECT, bool passive = false, bool durable = false, bool auto_delete = false) channel->DeclareExchange("topicX", Channel::EXCHANGE_TYPE_TOPIC, false, true); try{ BasicMessage::ptr_t msg = BasicMessage::Create(); msg->Body((string)"Hello World with routingkey:animal.bird.medium"); channel->BasicPublish("topicX", "animal.bird.medium", msg); cout << "SENT to exchange[topicX]:" << msg->Body() << endl; } catch(MessageReturnedException &e){ std::cout << "Message delivery error: " << e.what() << std::endl; } return 0; }

<topic_publisher.cpp>


#include <stdio.h>
#include <iostream>
#include <pthread.h>
#include <SimpleAmqpClient/SimpleAmqpClient.h>

using namespace std;
using namespace AmqpClient;
void *consumer_thread(void *data);

const char * queue[] = { "QA","QB", "QC", "QD", "QE", "QF"};

int main(int argc, char * argv[])
{
    pthread_t p_thread;
    for(int x = 0; x < 6; x++){
        pthread_create(&p_thread, NULL, consumer_thread, (void *)x);
        pthread_detach(p_thread);
    }

    while(1) sleep(1);
}

void *consumer_thread(void *data)
{
    int index = *((int*)(&data));
    printf("index:%d\n", index);
    sleep(1);
    
    Channel::ptr_t connection = Channel::Create("192.168.150.128", 5672);
    connection->BasicConsume(queue[index], queue[index]);
    int timeout = 5000;
    while(true){
        try{
            Envelope::ptr_t envelope;
            bool bflag = connection->BasicConsumeMessage(queue[index], envelope, timeout);
            if(bflag == false){  //time out
            cout << "time out => continue" << endl;
                continue;
            }
            usleep(index * 1000);
            cout << "Q[" << queue[index] << "] RCV:" << envelope->Message()->Body() << endl;
        }
        catch(MessageReturnedException &e){
            std::cout << "Message receive error: " << e.what() << std::endl;
        }
    }

}

<topic consumer.cpp>


You can see that QA,QB,QC,QD consumes the message.

# g++ topic_publisher.cpp -lrabbitmq -lSimpleAmqpClient -lpthread -o topic_publisher
# ./topic_publisher
SENT to exchange[topicX]:Hello World with routingkey:animal.bird.medium

# g++ topic_consumer.cpp -lrabbitmq -lSimpleAmqpClient -lpthread -o topic_consumer
# ./topic_consumer
index:0
index:1
index:3
index:4
index:5
index:2
Q[QA] RCV:Hello World with routingkey:animal.bird.medium
Q[QB] RCV:Hello World with routingkey:animal.bird.medium
Q[QC] RCV:Hello World with routingkey:animal.bird.medium
Q[QD] RCV:Hello World with routingkey:animal.bird.medium
time out => continue
time out => continue
time out => continue



Headers exchange

The fourth built-in exchange type is the headers exchange. It allows for arbitrary routing in RabbitMQ by using the headers table in the message properties. Queues that are bound to the headers exchange use the Queue.Bind arguments parameter to pass in an array of key/value pairs to route on and an x-match argument. The x-match argument is a string value that’s set to any or all. If the value is any, messages will be routed if any of the headers table values match any of the binding values. If the value of x-match is all, all values passed in as Queue.Bind arguments must be matched. This doesn’t preclude the message from having additional key/value pairs in the headers table. 

However, SimpleAmqpClient does not seem to support headers exchange yet. If you want to implement headers exchange, use another language such as Python or C#, or find a c/c++ library other than SimpleAmqpClient.


The source code can be downloaded from my github.




댓글

이 블로그의 인기 게시물

MQTT - C/C++ Client

RabbitMQ - C++ Client #1 : Installing C/C++ Libraries

C/C++ - Everything about time, date