6.6.4. Reading Messages from the Queue
The second program in the publish/subscribe example, Listener.java listens for messages from the wildcard topics usa.#, europe.#, #.news, and #.weather.
We start by creating a connection and a session as in the other examples:
Properties properties=new Properties();
properties.load(this.getClass().getResourceAsStream("pubsub.properties"));
Context ctx=new InitialContext(properties);
ConnectionFactory conFac=(ConnectionFactory) ctx.lookup("qpidConnectionfactory");
TopicConnection connection=(TopicConnection) conFac.createConnection();
connection.setExceptionListener(new ExceptionListener()
{
public void onException(JMSException jmse)
{
System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
System.exit(0);
}
});
TopicSession session=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
We write a function to create a message subscriber for a given topic:
private void createListener(Context ctx,TopicSession session,String topicName) throws Exception{
// lookup the topic usa
Topic topic=(Topic) ctx.lookup(topicName);
// Create a Message Subscriber
System.out.println(CLASS + ": Creating a Message Subscriber for topic " + topicName);
javax.jms.TopicSubscriber messageSubscriber=session.createSubscriber(topic);
// Set a message listener on the messageConsumer
messageSubscriber.setMessageListener(new MyMessageListener(topicName));
}
Now we create a session for the connection,create listeners for each topic that interests us, and start the connection:
System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); TopicSession session=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); createListener(ctx,session,"usa"); createListener(ctx,session,"europe"); createListener(ctx,session,"news"); createListener(ctx,session,"weather");
Now we start the connection to begin receiving messages, and wait until all messages have been received:
connection.start();
// Wait for the messageConsumer to have received all the messages it needs
synchronized (_lock)
{
while (_finished < 4 && !_failed)
{
_lock.wait();
}
}
// If the MessageListener abruptly failed (probably due to receiving a non-text message)
if (_failed)
{
System.out.println(CLASS + ": This sample failed as it received unexpected messages");
}
Once all messages have been received, we close the connection and the JNDI context:
connection.close(); getInitialContext().close();
Here is the code for the MessageListener used in this example:
private class MyMessageListener implements MessageListener
{
/* The topic this subscriber is subscribing to */
private String _topicName;
public MyMessageListener(String topicName)
{
_topicName=topicName;
}
public void onMessage(Message message)
{
try
{
String text;
if (message instanceof TextMessage)
{
text=((TextMessage) message).getText();
}
else
{
byte[] body=new byte[(int) ((BytesMessage) message).getBodyLength()];
((BytesMessage) message).readBytes(body);
text=new String(body);
}
if (text.equals("That's all, folks!"))
{
System.out.println(CLASS + ": Shutting down listener for " + _topicName);
synchronized (_lock)
{
_finished++;
_lock.notifyAll();
}
}
else
{
System.out.println(CLASS + ": Received message for topic: " + _topicName + ": " + text);
}
}
catch (JMSException exp)
{
System.out.println(CLASS + ": Caught an exception handling a received message");
exp.printStackTrace();
synchronized (_lock)
{
_failed=true;
_lock.notifyAll();
}
}
}
}
}