4.4.3. Reading Messages from the Queue
Now let's return to the consumer program. Since we now have four queues to read from, let's define a function that reads the messages from a queue. We will use this function like this:
dump_queue(client,"news") dump_queue(client,"weather") dump_queue(client,"usa") dump_queue(client,"europe")
The dump_queue() function creates a local queue, subscribes it to the corresponding queue on the broker, and reads from it until the queue is empty or a message reads “That's all, folks!”.
def dump_queue(client, queue_name):
print "Messages queue: " + queue_name
# Use the queue name as the consumer tag - need a unique tag
consumer_tag = queue_name
queue = client.queue(consumer_tag)
# Call basic_consume() to tell the broker to deliver messages
# from the AMQP queue to a local client queue. The broker will
# start delivering messages as soon as basic_consume() is called.
session.message_subscribe(queue=queue_name, destination=consumer_tag)
session.message_flow(consumer_tag, 0, 0xFFFFFFFF)
session.message_flow(consumer_tag, 1, 0xFFFFFFFF)
# Content of the last message read
content = ""
# "That's all, Folks!" in a message body, signals the last message
final = "That's all, folks!"
message = 0
while content != final:
try:
message = queue.get()
content = message.content.body
print content
except Empty:
if message != 0:
message.complete(cumulative=True)
print "No more messages!"
return
if message != 0:
message.complete(cumulative=True)
Start the topic consumer first, then start the topic producer in another window. If you are successful, you will see the following output:
Messages on the news queue: usa.news 0 usa.news 1 usa.news 2 usa.news 3 usa.news 4 That's all, folks! Messages on the weather queue: usa.weather 0 usa.weather 1 usa.weather 2 usa.weather 3 usa.weather 4 That's all, folks! Messages on the usa queue: usa.news 0 usa.news 1 usa.news 2 usa.news 3 usa.news 4 That's all, folks! Messages on the europe queue: europe.news 0 europe.news 1 europe.news 2 europe.news 3 europe.news 4 That's all, folks!
Congratulations! You have successully implemented a publish/subscribe application in Python. The configuration program set up a message queue and bound it to a topic exchange, the publisher wrote messages to the exchange, and the consumer read messages from the queue.