5.4.3. Reading Messages from the Queue
The second program in the publish/subscribe example, topic_listener.cpp, creates a local private queue, with a unique name, for each of the four binding keys it specifies: usa.#, europe.#, #.news, and #.weather, and creates a listener.
To make it easier to manage our queue subscriptions, we can write a listener that can listen to multiple queues. Here is the class declaration for the listener:
class Listener : public MessageListener {
private:
Session& session;
SubscriptionManager subscriptions;
public:
Listener(Session& session);
virtual void prepareQueue(std::string queue, std::string routing_key);
virtual void received(Message& message);
virtual void listen();
~Listener() { };
};
This listener is the heart of the program, and makes it possible for the rest of the program to be extremely simple. It uses a subscription manager to keep track of the queue subscriptions. The subscription manager constructor takes the session as an argument. It is initialized in the listener's constructor:
Listener::Listener(Session& session) :
session(session),
subscriptions(session)
{
}
In this example, we use the subscription manager to subscribe to multiple queues, then run the subscription manager to receive messages from all subscribed queues. The prepareQueue() method creates a queue using a queue name and a routing key supplied as arguments:
listener.prepareQueue("usa", "usa.#");
The prepareQueue() method combines the requested queue name with the the session ID to guarantee that the queue name is unique and will not clash with queues used by other clients that subscribe to a given topic. It then subscribes to the queue in the subscription manager. The code for prepareQueue() is:
void Listener::prepareQueue(std::string queue, std::string routing_key) {
/* Create a unique queue name for this consumer by concatenating
* the queue name parameter with the Session ID.
*/
queue += session.getId().str();
std::cout << "Declaring queue: " << queue << std::endl;
/* Declare an exclusive queue on the broker
*/
session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true);
/* Route messages to the new queue if they match the routing key.
*
* Also route any messages to with the "control" routing key to
* this queue so we know when it's time to stop. A publisher sends
* a message with the content "That's all, Folks!", using the
* "control" routing key, when it is finished.
*/
session.exchangeBind(arg::exchange="amq.topic", arg::queue=queue, arg::bindingKey=routing_key);
session.exchangeBind(arg::exchange="amq.topic", arg::queue=queue, arg::bindingKey="control");
/*
* subscribe to the queue using the subscription manager.
*/
std::cout << "Subscribing to queue " << queue << std::endl;
subscriptions.subscribe(*this, queue);
}
The listener's listen() method listens to all subscribed queues by running the subscription manager:
void Listener::listen() {
subscriptions.run();
}
When a message is received, it will be printed. If the message signals termination, we cancel the subscription to the queue the message came from. If no more subscriptions exist in the subscription manager, we also stop the subscription manager to return control to the application:
void Listener::received(Message& message) {
std::cout << "Message: " << message.getData() << " from " << message.getDestination() << std::endl;
if (message.getData() == "That's all, folks!") {
std::cout << "Shutting down listener for " << message.getDestination() << std::endl;
subscriptions.cancel(message.getDestination());
}
}
In the Java and Python bindings, messages need to be explicitly acknowledged. In C++, by default this is not needed, although the acknowledgement policy can be set to require explicit message acknowledgement.
Now that we have the code for the listener, the code for the main program is simplified:
// Create a listener for the session
Listener listener(session);
// Subscribe to messages on the queues we are interested in
listener.prepareQueue("usa", "usa.#");
listener.prepareQueue("europe", "europe.#");
listener.prepareQueue("news", "#.news");
listener.prepareQueue("weather", "#.weather");
// Wait for the broker to indicate that our queues have been created.
session.sync();
std::cout << "Listening for messages ..." << std::endl;
// Give up control and receive messages
listener.listen();