6.6.4. Reading Messages from the Queue

6.6.4. Reading Messages from the Queue

The second program in the publish/subscribe example, Listener.java listens for messages from the wildcard topics usa.#, europe.#, #.news, and #.weather.

We start by creating a connection and a session as in the other examples:

Properties properties=new Properties();
properties.load(this.getClass().getResourceAsStream("pubsub.properties"));

Context ctx=new InitialContext(properties);

ConnectionFactory conFac=(ConnectionFactory) ctx.lookup("qpidConnectionfactory");
TopicConnection connection=(TopicConnection) conFac.createConnection();

connection.setExceptionListener(new ExceptionListener()
{
    public void onException(JMSException jmse)
    {
        System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
        System.exit(0);
    }
});

TopicSession session=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

We write a function to create a message subscriber for a given topic:

private void createListener(Context ctx,TopicSession session,String topicName) throws Exception{
    // lookup the topic usa
    Topic topic=(Topic) ctx.lookup(topicName);
    // Create a Message Subscriber
    System.out.println(CLASS + ": Creating a Message Subscriber for topic " + topicName);
    javax.jms.TopicSubscriber messageSubscriber=session.createSubscriber(topic);

    // Set a message listener on the messageConsumer
    messageSubscriber.setMessageListener(new MyMessageListener(topicName));
}

Now we create a session for the connection,create listeners for each topic that interests us, and start the connection:

System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session");
TopicSession session=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

createListener(ctx,session,"usa");
createListener(ctx,session,"europe");
createListener(ctx,session,"news");
createListener(ctx,session,"weather");

Now we start the connection to begin receiving messages, and wait until all messages have been received:

connection.start();

// Wait for the messageConsumer to have received all the messages it needs
synchronized (_lock)
{
    while (_finished < 4 && !_failed)
    {
        _lock.wait();
    }
}

// If the MessageListener abruptly failed (probably due to receiving a non-text message)
if (_failed)
{
    System.out.println(CLASS + ": This sample failed as it received unexpected messages");
}

Once all messages have been received, we close the connection and the JNDI context:

connection.close();
getInitialContext().close();

Here is the code for the MessageListener used in this example:

    private class MyMessageListener implements MessageListener
    {
        /* The topic this subscriber is subscribing  to */
        private String _topicName;

        public MyMessageListener(String topicName)
        {
            _topicName=topicName;
        }

        public void onMessage(Message message)
        {
            try
            {
                String text;
                if (message instanceof TextMessage)
                {
                    text=((TextMessage) message).getText();
                }
                else
                {
                    byte[] body=new byte[(int) ((BytesMessage) message).getBodyLength()];
                    ((BytesMessage) message).readBytes(body);
                    text=new String(body);
                }
                if (text.equals("That's all, folks!"))
                {
                    System.out.println(CLASS + ": Shutting down listener for " + _topicName);
                    synchronized (_lock)
                    {
                        _finished++;
                        _lock.notifyAll();
                    }
                }
                else
                {
                    System.out.println(CLASS + ": Received message for topic: " + _topicName + ": " + text);
                }
            }
            catch (JMSException exp)
            {
                System.out.println(CLASS + ": Caught an exception handling a received message");
                exp.printStackTrace();
                synchronized (_lock)
                {
                    _failed=true;
                    _lock.notifyAll();
                }
            }
        }
    }
}