4.4.3. Reading Messages from the Queue

4.4.3. Reading Messages from the Queue

Now let's return to the consumer program. Since we now have four queues to read from, let's define a function that reads the messages from a queue. We will use this function like this:

dump_queue(client,"news")
dump_queue(client,"weather")
dump_queue(client,"usa")
dump_queue(client,"europe")

The dump_queue() function creates a local queue, subscribes it to the corresponding queue on the broker, and reads from it until the queue is empty or a message reads “That's all, folks!”.

def dump_queue(client, queue_name):

print "Messages queue: " + queue_name 


# Use the queue name as the consumer tag - need a unique tag

consumer_tag = queue_name
queue = client.queue(consumer_tag)

# Call basic_consume() to tell the broker to deliver messages
# from the AMQP queue to a local client queue. The broker will
# start delivering messages as soon as basic_consume() is called.

session.message_subscribe(queue=queue_name, destination=consumer_tag)
session.message_flow(consumer_tag, 0, 0xFFFFFFFF)
session.message_flow(consumer_tag, 1, 0xFFFFFFFF)

# Content of the last message read

content = ""		         

# "That's all, Folks!" in a message body, signals the last message

final = "That's all, folks!"   
message = 0

while content != final:
  try:
    message = queue.get()
    content = message.content.body
    print content
  except Empty:
    if message != 0:
    message.complete(cumulative=True)
    print "No more messages!"
    return

if message != 0:
  message.complete(cumulative=True)

Start the topic consumer first, then start the topic producer in another window. If you are successful, you will see the following output:

Messages on the news queue:
usa.news 0
usa.news 1
usa.news 2
usa.news 3
usa.news 4
That's all, folks!

Messages on the weather queue:
usa.weather 0
usa.weather 1
usa.weather 2
usa.weather 3
usa.weather 4
That's all, folks!

Messages on the usa queue:
usa.news 0
usa.news 1
usa.news 2
usa.news 3
usa.news 4
That's all, folks!

Messages on the europe queue:
europe.news 0
europe.news 1
europe.news 2
europe.news 3
europe.news 4
That's all, folks!

Congratulations! You have successully implemented a publish/subscribe application in Python. The configuration program set up a message queue and bound it to a topic exchange, the publisher wrote messages to the exchange, and the consumer read messages from the queue.