Building reliable JMS applications
[print_link]
Introduction
This article assumes you have some understanding of JMS internals. If not, you might want to start here. Here we describe what reliability means and how to build reliable JMS applications. To understand the source code you must be familiar with EJBs, XDoclet and Ant. The application used in the article showcases some nifty features like message selectors, dependancy injection via Xdoclet and Ant, EJB ENC etc.
Reliability
Reliability in computer science can be summed up in four words – Atomicity, Consistency, Isolation, Durability (
We create a new class called DemoBean and implement MessageDrivenBean and MessageListener. We inherit EJB behavior from MessageDrivenBean. The tags on top of the class are XDoclet tags which help us automatically create deployment descriptor files required by the EJB container. For more information on how to use Xdoclet please see here.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | /** * A simple message consumer * * @ejb.bean name="DemoBean" display-name="Demo MDB to demonstrate JMS features" * jndi-name="ejb/Demo" description="Listens to 'Demo' messages" * destination-type="javax.jms.Queue" * acknowledge-mode="Auto-acknowledge" * transaction-type="Container" * message-selector="Type='Even'" * * @ejb.env-entry name="MAX_RETRIES" type="java.lang.Integer" * value="${max.request.retries}" description="If an application * error occurs, message will be rolled back to the Queue and * retried this many times." * * @jboss:destination-jndi-name name="${jndi.queue.name}" * @websphere.mdb listenerPort="${jndi.queue.listener}" */ public class DemoBean implements MessageDrivenBean, MessageListener { private static final long serialVersionUID = -6540893609236685101L; private MessageDrivenContext messageDrivenContext = null; |
The following simple ant target creates all necessary boilerplate code.
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | <target name="ejbdoclet" description="Generates deployment descriptors etc."> <taskdef name="ejbdoclet" classname="xdoclet.modules.ejb.EjbDocletTask"> <classpath refid="project.classpath" /> </taskdef> <ejbdoclet ejbSpec="2.0" destDir="${gensrc.dir}"> <fileset dir="${src.dir}"> <include name="**/*Bean.java" /> </fileset> <deploymentdescriptor destDir="${ejb.descriptors.dir}"> </deploymentdescriptor> <websphere destDir="${ejb.descriptors.dir}"> </websphere> <jboss Version="3.0" destDir="${ejb.descriptors.dir}"> </jboss> </ejbdoclet> </target> |
The @ejb.bean tag in the Java class creates the following code in ejb-jar.xml file.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | <message-driven > <description><![CDATA[Listens to 'Demo' messages]]></description> <display-name>Demo MDB to demonstrate JMS features</display-name> <ejb-name>DemoBean</ejb-name> <ejb-class>com.ksi.demo.ejb.DemoBean</ejb-class> <transaction-type>Container</transaction-type> <message-selector><![CDATA[Type='Even']]></message-selector> <acknowledge-mode>Auto-acknowledge</acknowledge-mode> <message-driven-destination> <destination-type>javax.jms.Queue</destination-type> </message-driven-destination> <env-entry> <description><![CDATA[If an application error occurs, message will be rolled back to the Queue and retried this many times.]]></description> <env-entry-name>MAX_RETRIES</env-entry-name> <env-entry-type>java.lang.Integer</env-entry-type> <env-entry-value><![CDATA[4]]></env-entry-value> </env-entry> </message-driven> |
The transaction-type tag in the ejb-jar file lets the EJB container start and manage transactions for us. The message-selector tag lets the Connection know that only messages assigned a property ‘Type’ and its value ‘Even’ be sent to this bean. The tag subscription-durability tells the JMS Container to persist the messages in case of a container failure. In addition to these we create an variable (available only in the ENC of this bean) called MAX_RETRIES.
As you may have noticed the values of MAX_RETRIES, jboss destination-jndi-name and websphere.mdb listenerPort are enclosed in ${}. These are properties defined in build.properties available to Ant. Ant populates these to generate ejb-jar file. We inject this depedancy at run time. We will discuss MAX_RETRIES later. Since we implement MessageListener, we inherit onMessage() method. When a message arrives in a queue, the JMS container invokes the onMessage() of MDB to pass the message object.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | /** * This is called when there is a message in the Queue. * * @ejb.transaction type="Required" */ public void onMessage(Message msg) { try { System.out.println("Message:" + msg); MapMessage message = (MapMessage) msg; Enumeration e = message.getMapNames(); Hashtable data = new Hashtable(); while (e.hasMoreElements()) { String key = (String) e.nextElement(); String val = message.getString(key); data.put(key, val); } |
Since the client posts a MapMessage, we cast the generic Message interface to MapMessage and retrieve data stored in the map. This is the basic work flow of a JMS application. You can run ant, deploy the EAR file in JBoss or WebSphere to test this behavior.
Atomicity
Atomicity refers to the ability of JMS application to gaurantee that all tasks within a transaction are performed or none at all. A transaction is a group of tasks that run as one single unit of work. In other words all the tasks are performed successfully or none at all. So when a failure occurs (OutOfMEmoryErrors, connection failures etc.) tasks in a transaction are undone from where the failure occurred. In this scenario, tasks that need to be performed are the following:
- Receive message from the queue
- Print the payload in message (the even number)
- Acknowledge that message was received and that all processing was done
For our bean to be reliable, we require all these tasks to be performed. In other words, message needs to be processed atomically. When a failure occurs while message is being processed, we would like to put the message back in the queue so it can be picked up again for processing (hopefully after the cause of the failure has been fixed). This process is called a rollback. There are two ways a transaction can be performed in a EJB container:
- by the MDB (Bean Managed Transaction)
- by the container (Container Managed Transaction)
In Bean Managed Transaction, as the name suggests, the bean is responsible for the three tasks. However, message receipt (first task) that causes the bean to be invoked is not part of the transaction. An example of bean managed transaction.
public void onMessage(Message msg) { UserTransaction ut = messageDrivenContext.getUserTransaction(); try { ut.begin(); System.out.println("Message:" + msg); MapMessage message = (MapMessage) msg; Enumeration e = message.getMapNames(); Hashtable data = new Hashtable(); while (e.hasMoreElements()) { String key = (String) e.nextElement(); String val = message.getString(key); data.put(key, val); } ut.commit(); } catch (Exception ex) { try { ut.rollback(); } catch (SystemException e) { throw new EJBException("Rollback failed" + e.getMessage()); } throw new EJBException ("Transaction failed: " + e.getMessage()); } }
The following picture shows demarcation of a baen managed transaction.
|
|
|
|
Since this is not what we want, we request the EJB container to manage transactions for us. The transaction-type tag in the ejb-jar file does exactly this. The EJB container’s transaction manager gaurantees that message will be delivered to the MDB, and when the processing is done, automatically acknowledges the JMS container which then discards the message. If for some reason an acknowledgement is not sent, the JMS container resends the message to the MDB.
|
|
|
|
Hence the three steps are executed as one unit of work or in other words atomically.
Consistency
Consistency refers to the property of an application being in a legal state when the transaction begins and when it ends. This means that a transaction moves the state of entities from one consistent state to another. For an application to be reliable, when a problem occurs the application needs to revert to the last consistent state. In this DemoBean, lets simulate a failure condition by throwing an exception at random.
122 123 124 125 126 127 | // Randomly throw an exception so that the message will be rolled // back. Random random = new Random(); if (random.nextInt(3) == 1) { throw new JMSException("Something bad happened!"); } |
There are two things we can do to revert to a previous consistent state: (a) re-throw the exception to let the transaction manager know that there has been a problem or (b) do not acknowledge the message. Re-throwing an exception automatically rolls the transaction back which puts the message back in the queue. This is the advantage of using an MDB. However, if the problem persists, we keep getting into the same failure scenario. So this does not help a lot. We want to be able to control the rollback process so that the same message is not tried repeatedly draining the resources. For this reason we write a method to control the rollback process:
130 131 132 | } catch (JMSException e) { rollback(msg, e); } |
Inside the rollback() method we check for Delivery Count in the header field. In JBoss this field is JMS_JBOSS_REDELIVERY_COUNT and in WebSphere this is JMSXDeliveryCount. If the message has been delivered more than MAX_RETRIES times, instead of rolling the message back we send an email to a human agent. MAX_RETRIES is a variable available in the Environment Naming Context of this bean. Is is injected at build time using Ant.
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | try { InitialContext context = new InitialContext(); maxRetries = (Integer) context.lookup("java:comp/env/MAX_RETRIES"); } catch (NamingException e2) { System.out.println(e2.getMessage()); } try { if (message.propertyExists("JMSXDeliveryCount")) { retryAttempt = message.getIntProperty("JMSXDeliveryCount"); } } catch (JMSException e2) { } try { String messageID = message.getJMSMessageID(); if (retryAttempt < maxRetries.intValue()) { messageDrivenContext.setRollbackOnly(); System.out.println("Message " + messageID + " rolledback due to " + cause.getMessage()); } else { TextMessage textMessage = (TextMessage) message; |
Since the exception is processed gracefully, the transaction manager correctly assumes that the message processing was successful and automatically acknowledges the JMS container.
Isolation
Isolation refers to the ability of the application to make operations in a transaction appear isolated from all other operations. According to the EJB Specification version 2.1, transactions not only make completion of a unit of work atomic, but they also isolate the units of work from each other, provided that the system allows concurrent execution of multiple units of work. Hence we are guranteed that an invocation of onMessage() method has no impact on invocation of onMessage() on another MDB instance. Note that an EJB container allows many instances of a message-driven bean class to be executing concurrently, thus allowing for the concurrent processing of a stream of messages. The isolation level describes the degree to which the access to a resource manager by a transaction is isolated from the access to the resource manager by other concurrently executing transactions. For more information on isolation levels please see Sun’s J2EE tutorial.
Durability
Durability refers to the guarantee that a successful transaction will persist and can not be undone. JMS supports two modes of message delivery.
- The NON_PERSISTENT mode is the lowest-overhead delivery mode because it does not require that the message be stored in a stable storage. A JMS provider failure can cause a NON_PERSISTENT message to be lost.
- The PERSISTENT mode instructs the JMS container to take extra care to insure the message is not lost in transit due to a JMS provider failure.
Delivery mode is set to PERSISTENT by default. JMS providers never produce duplicate messages. This means that a producer that produces a message can rely on the JMS container to insure that consumers of the message (MDB) will receive it only once. No client error can cause a provider to duplicate a message.
Attachments
Source code
Unzip and run ant in the project’s root folder.









Leave your response!