6.5.4. Reading Messages from the Queue using a Message Listener
In the previous section, we read messages using a message consumer. In this section we read using a message listener, which receives messages asynchronously. As you will see, this program is extremely similar to the listener used in for the direct example, differing only in the JNDI property file used and the code used to ensure that each listener has a queue with a unique name. We create the connection, session, and queue precisely as for the message consumer:
// Load JNDI properties
Properties properties = new Properties();
properties.load(this.getClass().getResourceAsStream("fanout.properties"));
// Create the initial context
Context ctx = new InitialContext(properties);
// Look up destination and connection factory
Destination destination = (Destination)ctx.lookup(queueName);
ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory");
Connection connection = conFac.createConnection();
connection.setExceptionListener(new ExceptionListener()
{
public void onException(JMSException jmse)
{
// The connection may have broken invoke reconnect code if available.
System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
System.exit(0);
}
});
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
The class we are implementing is the message listener class, we register it with the message consumer object, then wait for messages to arrive. We use the flag _failed to indicate that an invalid message has been received.
MessageConsumer messageConsumer = session.createConsumer(destination);
// Set the listener and start the connection
messageConsumer.setMessageListener(this);
connection.start();
// Wait for the messageConsumer to have received all the messages it needs
synchronized (_lock)
{
while (!_finished && !_failed)
{
_lock.wait();
}
}
if (_failed)
{
System.out.println(CLASS + ": This sample failed as it received unexpected messages");
}
// Once all messages have been received, close the connection and the JNDI context
connection.close();
ctx.close();
A message listener implements an onMessage() method, which takes a Message as a parameter. Our onMessage method prints the message, checks to see if it is the final message, and signals termination if it is. If an exception is received, we set _failed to true before termination.
public void onMessage(Message message)
{
try
{
String text;
if (message instanceof TextMessage)
{
text = ((TextMessage) message).getText();
}
else
{
byte[] body = new byte[(int) ((BytesMessage) message).getBodyLength()];
((BytesMessage) message).readBytes(body);
text = new String(body);
}
if (text.equals("That's all, folks!"))
{
System.out.println(CLASS + ": Received final message " + text);
synchronized (_lock)
{
_finished = true;
_lock.notifyAll();
}
}
else
{
System.out.println(CLASS + ": Received message: " + text);
}
}
catch (JMSException exp)
{
System.out.println(CLASS + ": Caught an exception handling a received message");
exp.printStackTrace();
synchronized (_lock)
{
_failed = true;
_lock.notifyAll();
}
}
}