6.4.5. Reading Messages from the Queue using a Message Listener

6.4.5. Reading Messages from the Queue using a Message Listener

In the previous section, we read messages using a message consumer. In this section we read using a message listener, which receives messages asynchronously. We create the connection, session, and queue precisely as for the message consumer:

// Load JNDI properties
Properties properties = new Properties();
properties.load(this.getClass().getResourceAsStream("direct.properties"));

//Create the initial context
Context ctx = new InitialContext(properties);

// look up destination and connection factory
Destination destination = (Destination)ctx.lookup("directQueue");
ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory");

Connection connection = conFac.createConnection();
connection.setExceptionListener(new ExceptionListener()
{
    public void onException(JMSException jmse)
    {
        System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
        System.exit(0);
    }
});

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

The class we are implementing is the message listener class, we register it with the message consumer object, then wait for messages to arrive. We use the flag _failed to indicate that an invalid message has been received.

MessageConsumer messageConsumer = session.createConsumer(destination);

// Set the listener and start the connection
messageConsumer.setMessageListener(this);
connection.start();

// Wait for the messageConsumer to have received all the messages it needs
synchronized (_lock)
{
    while (!_finished && !_failed)
    {
        _lock.wait();
    }
}
if (_failed)
{
    System.out.println(CLASS + ": ERROR: invalid message(s)");
}

// Once all messages have been received, close the connection and the JNDI context
connection.close();
getInitialContext().close();

A message listener implements an onMessage() method, which takes a Message as a parameter. Our onMessage method prints the message, checks to see if it is the final message, and signals termination if it is. If an exception is received, we set _failed to true before termination.

public void onMessage(Message message)
{
    try
    {
        String text;
        if (message instanceof TextMessage)
        {
            text = ((TextMessage) message).getText();
        }
        else
        {
            byte[] body = new byte[(int) ((BytesMessage) message).getBodyLength()];
            ((BytesMessage) message).readBytes(body);
            text = new String(body);
        }
        if (text.equals("That's all, folks!"))
        {
            System.out.println(CLASS + ": Received final message " + text);
            synchronized (_lock)
            {
                _finished = true;
                _lock.notifyAll();
            }
        }
        else
        {
            System.out.println(CLASS + ": Received  message:  " + text);
        }
    }
    catch (JMSException exp)
    {
        System.out.println(CLASS + ": Caught an exception handling a received message");
        exp.printStackTrace();
        synchronized (_lock)
        {
            _failed = true;
            _lock.notifyAll();
        }
    }
}