6.4.5. Reading Messages from the Queue using a Message Listener
In the previous section, we read messages using a message consumer. In this section we read using a message listener, which receives messages asynchronously. We create the connection, session, and queue precisely as for the message consumer:
// Load JNDI properties
Properties properties = new Properties();
properties.load(this.getClass().getResourceAsStream("direct.properties"));
//Create the initial context
Context ctx = new InitialContext(properties);
// look up destination and connection factory
Destination destination = (Destination)ctx.lookup("directQueue");
ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory");
Connection connection = conFac.createConnection();
connection.setExceptionListener(new ExceptionListener()
{
public void onException(JMSException jmse)
{
System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
System.exit(0);
}
});
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
The class we are implementing is the message listener class, we register it with the message consumer object, then wait for messages to arrive. We use the flag _failed to indicate that an invalid message has been received.
MessageConsumer messageConsumer = session.createConsumer(destination);
// Set the listener and start the connection
messageConsumer.setMessageListener(this);
connection.start();
// Wait for the messageConsumer to have received all the messages it needs
synchronized (_lock)
{
while (!_finished && !_failed)
{
_lock.wait();
}
}
if (_failed)
{
System.out.println(CLASS + ": ERROR: invalid message(s)");
}
// Once all messages have been received, close the connection and the JNDI context
connection.close();
getInitialContext().close();
A message listener implements an onMessage() method, which takes a Message as a parameter. Our onMessage method prints the message, checks to see if it is the final message, and signals termination if it is. If an exception is received, we set _failed to true before termination.
public void onMessage(Message message)
{
try
{
String text;
if (message instanceof TextMessage)
{
text = ((TextMessage) message).getText();
}
else
{
byte[] body = new byte[(int) ((BytesMessage) message).getBodyLength()];
((BytesMessage) message).readBytes(body);
text = new String(body);
}
if (text.equals("That's all, folks!"))
{
System.out.println(CLASS + ": Received final message " + text);
synchronized (_lock)
{
_finished = true;
_lock.notifyAll();
}
}
else
{
System.out.println(CLASS + ": Received message: " + text);
}
}
catch (JMSException exp)
{
System.out.println(CLASS + ": Caught an exception handling a received message");
exp.printStackTrace();
synchronized (_lock)
{
_failed = true;
_lock.notifyAll();
}
}
}