JMS Internals
Messaging Styles
JMS defines two messaging styles — Point-to-Point and Publish/Subscribe Model. These styles are often referred to as messaging domains and each represent a common model for messaging.
Point-to-Point (PTP) Model
In this model, a messege sent by a producer is delivered to one active consumer. Messages are stored in a queue until at least one consumer becomes active.
A Queue is an object that represents a JMS provider’s queue. JMS does not define API methods to create, administer or delete Queues. However the API does provide a way to create temporary queues. A temporary queue is typically used to send reply messages. A temporary queue is alive only for the duration of the connection that created it.
try { Object ref = jndiContext.lookup("QueueConnectionFactory"); QueueConnectionFactory qcf = (QueueConnectionFactory)ref; QueueConnection qc = qcf.createQueueConnection(); QueueSession session = qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryQueue queue = session.createTemporaryQueue(); } catch (Exception e) { // ... }
Static queues are created using facilities provided by the JMS provider. To create a static queue in JBoss, an XML file with the following contents are deployed in deploy folder. WebSphere provides administrative console to create static queues.
<!-- Creates a Queue in JBoss --> <mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.mq.destination:service=Queue,name=KSIDemoQueue"> <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends> </mbean>
Publish/Subscribe (pub/sub) Model
In this model, JMS clients publish messages to and subscribe to messages from a well-known node called a Topic. A topic is a mini message broker that delivers a message to all clients registered to it. In this model the terms publish and subscribe are used in place of more generic terms produce and consume.
Another key difference is that JMS supports durability of subscribers that remembers the existence of them while they were inactive. So any messages delivered while a durable subscriber was inactive are resent to it when it becomes active again. If there is no active subscriber for a durable subscription, JMS retains the subscription’s messages until they are received by the subscription or until they expire.
Just as in the previous model, JMS does not define API methods to create, administer or delete Topics. However a temporary topic may be created using a TopicConnection.
try { TopicConnectionFactory tcf = (TopicConnectionFactory)jndiContext.lookup("TopicConnectionFactory"); TopicConnection tc = tcf.createTopicConnection(); TopicSession session = tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryTopic topic = session.createTemporaryTopic(); } catch (Exception e) { // ... }
In JBoss a static topic can be created using the following code.
<mbean code="org.jboss.mq.server.jmx.Topic" name="jboss.mq.destination:service=Topic,name=KSIDemoTopic"> <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends> </mbean>
JMS Facilities
Following are the steps to produce a message and send it to a Destination (Queue or a Topic).
- Initialize and obtain reference to JNDI tree
- Use JNDI to find a ConnectionFactory object
- Use JNDI to find a Destination objects
- Use the ConnectionFactory to create a JMS Connection
- Use the Connection to create one or more JMS Sessions
- Use a Session and the Destinations to create the MessageProducers and MessageConsumers needed
- Tell the Connection to start delivery of messages
At this point a client has the basic JMS setup needed to produce messages.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | public void setUp() throws JMSException, NamingException { // 1. Initialize JNDI context with server info Properties props = new Properties(); props.put(InitialContext.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); props.put(InitialContext.PROVIDER_URL, "jnp://localhost:1099"); props.put(InitialContext.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); InitialContext iniCtx = new InitialContext(props); // 2. Look for a connection factory object (administered object) Object tmp = iniCtx.lookup("QueueConnectionFactory"); QueueConnectionFactory qcf = (QueueConnectionFactory) tmp; // 3. Look for the queue (destination object) requestQueue = (Queue) iniCtx.lookup("queue/KSIDemoQueue"); // 4. Obtain a connection queueConnection = qcf.createQueueConnection(); // 5. Request a session queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); // 6. Obtain a sender reference (message producer) queueSender = queueSession.createSender(requestQueue); // 7. Tell the connection to start delivering messages queueConnection.start(); } |
A ConnectionFactory encapsulates a set of connection configuration parameters and is used by the client to create a connection with the JMS container. Both QueueConnectionFactory and TopicConnectionFactory inherit from ConnectionFactory. A Connection encapsulates an open connection with a JMS provider. Typically this is a TCP/IP socket connection between the client and the JMS provider’s service daemon. Initially when a connection is created it is in stopped mode. When the setup is complete the client calls start() on the connection object and messages begin arriving at the connection’s consumers.
Pausing message delivery
Message delivery can be stopped by invoking stop() on a connection. Delivery will resume when start() is invoked. Note that a MessageProducer can send messages while a Connection is stopped. Stopping a stopped connection and starting a started connection has no effect.
Receiving messages synchronously
Messages can be received synchronously or asynchronously. The MessageConsumer interfaces QueueReceiver and TopicSubscriber provide methods to receive messages synchronously.
queueReceiver = queueSession.createReceiver(queue); TextMessage stockMessage; stockMessage = (TextMessage)queueReceiver.receive();
This method is a blocking call and does not return until a message is consumed. To limit the amount of wait time, a timeout parameter can be used with the receive() method.
queueReceiver = queueSession.createReceiver(queue); TextMessage stockMessage; // wait for 4 secs stockMessage = (TextMessage)queueReceiver.receive(4000);
The units of timeout are milliseconds.
Receiving messages asynchronously
JMS provides a non-blocking API to receive messages asynchronously. Clients need to implement MessageListener. A typical implementation:
public class StockListener implements MessageListener { public void onMessage(Message message) { // process the message ... } } StockListener myListener = new StockListener(); queueReceiver = queueSession.createReceiver(queue); queueReceiver.setMessageListener(myListener);
The Connection must be started for the message delivery to begin. Starting with J2EE 1.3, MessageDrivenBean, a new type of EJB was added to the API. A MessageDrivenBean must implement MessageListener interface to asynchronously receive messages in a EJB environment.
Message Selection
MessageConsumers may choose to receive messages matching a certain criteria. A message selector can be used for this purpose. Message selectors work against properties assigned to a message. MessageProducers assign properties via setXXXProperty() and getXXXProperty() methods as described earlier and MessageConsumers create a selector to let the Connection know which messages to forward. Selectors are defined using a SQL-like language.
// Create a map of stocks MapMessage message = queueSession.createMapMessage(); message.setString("IBM", new Double(50.23)); message.setString("MSFT", new Double(11.26)); // Assign a property to this message message.setStringProperty("StockSector", "Tech"); // Send the message queueSender.send(message);
In the client define a selector to filter ‘Tech’ stocks.
String selector = "(StockSector = 'Tech')"; receiver = session.createReceiver(queue, selector);
Now this consumer receives messages only that are assigned a property named ‘StockSector’ and whose value is ‘Tech’. Selectors can be written using conditional, logical and mathematical operators. For e.g.,
"JMSType = `car' AND color = `blue' AND weight > 2500"
selects messages with a message type of car and color of blue and weight greater than 2500 lbs. For more information on how to use expressions please see the JMS API.
Conclusion
JMS is a simple yet powerful technology in the J2EE stack. The Point-to-Point and Publish/Subscribe domains address the common messaging problems in a enterprise system. The five forms of message models supported by JMS provides flexibility in transferring various types of messages. We also discussed how to pause message processing and how a message consumer can select messages using selectors.
Attachments
Pages: 1 2









Leave your response!