4.4.1. Subscribing: Declaring Private Queues
In this application, the topic consumer sets up its own queues, binding them to the topic exchange with binding keys that identify interesting messages. It sets up queues for news, weather, usa, and europe, then binds these to the topic exchange using binding keys that contain wildcards. For instance, the news queue is bound using the binding key #.news, and the usa queue is bound using the binding key usa.#. If a message is published to the amq.topic exchange using the routing key usa.news, it matches both binding keys, and is delivered to both the usa and news queues.
In this application, these queues are exclusive, and can not be used by other consumers. That also means that each queue must have a unique name. We use the session id, which is a universally unique identifier, to guarantee this. To get the session id, we keep the return code from session_open(), then extract the session id from it like this:
session = client.session() session_info = session.session_open() session_id = session_info.session_id
Now that we have a session id, we can use it to construct unique names for our queues.
news = "news" + base64.urlsafe_b64encode(session_id) weather = "weather" + base64.urlsafe_b64encode(session_id) usa = "usa" + base64.urlsafe_b64encode(session_id) europe = "europe" + base64.urlsafe_b64encode(session_id) session.queue_declare(queue=news, exclusive=True) session.queue_declare(queue=weather, exclusive=True) session.queue_declare(queue=usa, exclusive=True) session.queue_declare(queue=europe, exclusive=True)
Now let's bind these queues to the amq.topic exchange, using binding keys to indicate which messages this consumer wishes to receive in each of its private queues.
session.queue_bind(exchange="amq.topic", queue=news, routing_key="#.news") session.queue_bind(exchange="amq.topic", queue=weather, routing_key="#.weather") session.queue_bind(exchange="amq.topic", queue=usa, routing_key="usa.#") session.queue_bind(exchange="amq.topic", queue=europe, routing_key="europe.#")
Finally, we need a way to terminate when the server has finished publishing messages. The server indicates this by sending a message with the content “That's all, folks!”, using the routing key “control”. The subscriber neds to ensure that all messages are processed from each queue before terminating. To do this, we bind the “control” routing key to each queue.
session.queue_bind(exchange="amq.topic", queue=news, routing_key="control") session.queue_bind(exchange="amq.topic", queue=weather, routing_key="control") session.queue_bind(exchange="amq.topic", queue=usa, routing_key="control") session.queue_bind(exchange="amq.topic", queue=europe, routing_key="control")
Because AMQP guarantees that messages on a single queue are received in order, we simply process the messages on each queue until the terminating message is received, at which point we know we are finished with that queue.