We’re currently working on a JMS-drived application, which is being used as an integration point between several systems. We’ve defined a standard exception handling process – checked exceptions for those errors that can be handled by the system, unchecked exceptions for those errors that cannot be handled. The unchecked exceptions are all allowed to bubble up, where they are caught at the top level (and the top level only).
Given that the entire system is driven by received messages, the “top” of our application is our MessageListener
, which before today looked a little like this:
public void onMessage(Message m) {
try {
process(m);
} catch (Throwable t) {
log(t);
}
}
As per our exception handling strategy, this is the only place where unchecked exceptions are caught. Today we started implementing the various sad-path scenarios, the first of which was “what happens if processing the message fails?”. In our case this scenario translates as process
throwing an unchecked exception. This being our first scenario, we’ve assumed that any runtime exception is a transient error – possibly down to one of the systems we’re integrating with being down. As such we decided that we’ll want to attempt to process the message again.
The simplest way to re-try a message is to use a transactional JMS session and rollback the session, as this returns the message back on to the topic/queue – we’d then specify a maximum number of times a message can be retried. It also follows that when using a transactional session you need to commit the session if the message was successfully processed.
Adding the code to commit a transaction is straightforward – but we do have to expose the session inside the listener (we’re using a topic here for monitoring purposes):
public OurMessageListener(TopicSession topicSession) {
this.topicSession = topicSession;
}
public void onMessage(Message m) {
try {
process(m);
//if we get here, we've processed the message
topicSession.commit();
} catch (Throwable t) {
log(t);
}
}
Adding the code to roll a session back is a bit more work:
public void onMessage(Message m) {
try {
process(m);
topicSession.commit();
} catch (Throwable t) {
log(t);
topicSession.rollback();
}
}
Great so far, but our next “sad-path” scenario is going to give us a little more trouble. What if we receive a message that we can’t translate into something meaningful? We don’t want to re-try the message, as we know we’re not going to be able to handle it later – the message is just plain bad. To handle this case, we separated out the message processing and had it throw a checked exception:
public void onMessage(Message m) {
try {
Object command = new MessageHandler(m);
process(command);
topicSession.commit();
} catch (MessageInvalidException e) {
log(t);
//we don't want to retry the message
topicSession.commit();
} catch (Throwable t) {
log(t);
topicSession.rollback();
}
}
This works as far as it goes, but up to this point I’ve been oversimplifying things a little. We’d abstracted our use of JMS behind two simple methods, which so far had been good enough for use in all our tests, and both client and server code:
public interface MessagingFacade {
void subscribe(MessageListener listener);
void publish(String xmlToSend);
}
The subscribe
call hides all the JMS plumbing – including the creation of the session itself. If we want to pass our session into the message listener, we need to expose it from the MessagingFacade
or create it ourself – either way we kind of defeat the object of the facade. If we don’t use the facade, we end up complicating much of our code.
The solution we came up with was to create a TransactionalMessagingListener
like so:
public class TransactionalMessagingListener
implements MessageListener {
public TransactionalMessagingListener(
TopicSession topicSession,
MessageListener delegate) {
...
}
public void onMessage(Message m) {
try {
delegate.onMessage(m)
topicSession.commit();
} catch (Throwable t) {
log(t);
topicSession.rollback();
}
}
Our underlying message listener is no longer the top of our system so doesn’t need to log throwable. Nor does it need to worry about the TopicSession
, so becomes much simpler – we catch and log the checked exception related to message processing and let any unchecked exceptions bubble up to the TransactionalMessagingListener
:
public void onMessage(Message m) {
try {
Object command = new MessageHandler(m);
process(command);
} catch (MessageInvalidException e) {
log(t);
}
}
And finally we change our MessagingFacade
a little, making the subscribe
method more specific by calling it subscribeWithTransaction
, and wrapping the listener with our new TransactionalMessageListener
:
public void subscribeWithTransaction(
MessageListener listener) {
...
TransactionalMessageListener txnListener =
new TransactionalMessageListener(
topicSession,
listener
);
...
}
And there we have it. All the code is simple and testable – and not a dynamic proxy in sight (take that AOP nut-cases!). I still can’t help thinking there was a simpler way of handling this though…