Thursday, January 18, 2007

Message-driven beans

Message-driven beans


EJB Tutorials


Message-driven beans overview


Message-driven processing is a very effective, and often underused, technique for programs that do not need to operate directly in response to user requests (that is, for batch rather than interactive type processing). For example, how can a program know that it should shut down gracefully, or automatically react to some other external event? Messaging is a good solution in these cases. It is also useful for processing that is not time critical, including some interactive applications; why should a user wait for a process to complete when it doesn't matter if the task is done exactly at that moment? Many data entry tasks in which a set of data (debits/credits, invoices, or receipts, for instance) is entered, then reviewed, and finally posted, are appropriate examples. Messaging can also be useful for interprocess communication and systems integration using text or XML data, even when programs are written in different languages. From the messaging client's perspective, it just sends a message and immediately goes on to other tasks, relying on the message's receiver to perform operations against the message data.

Message-driven beans (MDBs) are a new type of bean introduced with the EJB 2.0 specification. MDBs are stateless in terms of client-conversational state. They are used for asynchronous processing of Java Message Service (JMS) messages, and go into action upon the receipt of a client message, although they are not linked to any specific client. In fact, since the messages come from a queue or a topic, a single bean may act on behalf of many clients. In contrast to the other bean types, MDBs have no home or component interfaces, and are invoked by the container in response to the arrival of JMS messages. From the client view, an MDB is a message consumer that performs one or more tasks on the server, based on its interpretation of the message.

Notice that MDBs are all about asynchronicity; session and entity beans could, and still can, process synchronous messages. Prior to the advent of message-driven beans, however, there was no real way that EJB components could process asynchronous messages; generally, an external application was necessary.

Figure 5. The javax.ejb.MessageDrivenBean interface
Interface javax.ejb.MessageDrivenBean

All message-driven beans must implement the javax.ejb.MessageDrivenBean interface, as shown in Figure 5. An MDB's life cycle is very similar to that of a stateless session bean (see Stateless session beans ) in that it only has two states: Does Not Exist and Ready. The container is responsible for creating instances of MDBs before message delivery begins. To place an MDB in the Ready state, the container:

  • Instantiates a bean using the Class.newInstance() method
  • Invokes the bean's setMessageDrivenContext() method
  • Invokes the bean's ejbCreate() method

At this point, the bean enters the method-ready pool and can be called upon to process messages. When the container no longer requires the bean, it calls the bean's ejbRemove() method. After this action, the bean is effectively destroyed and back to the Does Not Exist state.

Figure 6. The javax.jms.MessageListener interface
Interface javax.jms.MessageListener

Message-driven beans must also implement the javax.jms.MessageListener interface, as shown in Figure 6. The interface consists of just one callback method, onMessage(), which has a single javax.jms.Message input argument. The callback feature means that the bean doesn't have to spend CPU cycles and time polling for messages, leading to more scalable and potentially higher performance applications. If you wondered why other bean types couldn't handle asynchronous messages, the reason is that the EJB 2.0 specification states that they are not "permitted" to implement MessageListener. This ban is not simply a specification mandate; several technical reasons exist for the restriction.

onMessage() is really where all of the action occurs, but before discussing that method and message-driven beans further, an understanding of JMS concepts, lingo, and basic operations is required. The next few sections briefly cover the necessary JMS background.




The Java Message Service (JMS)

Enterprise messaging systems (EMS) comprise a product category often used in larger systems to provide assured delivery of messages between participating applications. These products are also known as message oriented middleware, or MOM. As with many software products, the API for virtually every EMS/MOM is vendor proprietary. The JMS specification was written to unify these APIs for the Java developer, and includes both an API and a Service Provider Interface (SPI). Any vendor can use the SPI to write what amounts to a pluggable driver to implement the JMS specification. To the developer, this aspect of JMS means product independence and transparency, and only one standard API to learn.

JMS supports two messaging methodologies (or domains ): point-to-point (PTP) and publish and subscribe (pub/sub). PTP deals with queues, senders, and receivers. There may be many message senders to a queue, but, normally, only one type of receiver (since MDBs can be pooled, there may be several of a given type consuming messages from the queue). Once the message has been received, it is removed from the queue. Pub/sub deals with topics, publishers, and subscribers. There may be any number of publishers for a topic, and any number of subscribers to the topic (similar in concept to Java listeners). Subscribers are notified of a newly published message for a topic, and every subscriber gets a copy. JMS uses the notion of message producers for clients that generate and send or publish messages, and message consumers for clients that receive messages via queues or topics. Queues and topics are known as destinations.

The basic steps in JMS programming are:

  • Obtain a JMS ConnectionFactory. Use a JNDI lookup to obtain a QueueConnectionFactory (PTP) or a TopicConnectionFactory (pub/sub).

  • Obtain a JMS destination. Use a JNDI lookup to obtain a Queue (PTP) or a Topic (pub/sub).

  • Create a JMS Connection. This will either be a QueueConnection (PTP) or a TopicConnection (pub/sub). Use the specific ConnectionFactory.createXXXConnection() method.

  • Create a JMS Session. This will either be a QueueSession (PTP) or a TopicSession (pub/sub). Use the specific Connection.createXXXSession() method.

  • Create a producer or consumer. This can be a QueueSender or a QueueReceiver created using the QueueSession (PTP) or a TopicPublisher or a TopicSubscriber created using the TopicSession (pub/sub).

  • Send and/or receive a Message. Use the producer or consumer in conjunction with the destination. For consumers, call Connection.start() to initiate the flow of incoming messages.

All of the classes referenced above are in the javax.jms package. Although we will only use PTP in the example, these steps apply to programs for both message domains. For more information about JMS and JMS programming, see Resources.




JMS destinations and messages

Both JMS Queue s and Topic s are referred to generally as destinations. From a J2EE perspective, they also fall under a group, along with connection factories, known as administered objects. This means that they must be created and maintained by an administrator, external to a bean or application. It also means using a vendor-proprietary method to perform these tasks. The J2EE RI provides an administration tool, j2eeadmin, which you can use to create queues and topics. We've seen j2eeadmin before, when adding a DataSource entry (see The Rock Survey database ). For destinations, the
-addJmsDestination
input argument is sent to j2eeadmin. The deployment panel for the message-driven bean example program, Deploying the Rock Survey example, take 3, shows how to use j2eeadmin to create a Queue.

Ultimately, messages are the reason for using JMS. The API defines a Message interface, along with subinterfaces for specific message types. A Message is composed of three parts:

  • Header: Header values contain information useful for identifying and routing messages.

  • Properties: Properties are a built-in mechanism for adding application-specific header information. Programs can use this information for message selection and filtering.

  • Body: JMS defines five body types (and corresponding subinterfaces) for a Message:

    • Bytes
    • Map
    • Object
    • Stream
    • Text


    You can read about the details of the types and subinterfaces in the J2EE API documentation. The important thing to consider here is message data portability; clearly a Message containing a Java Object will not be understood by a C language, COBOL, RPG, or other language program. While writing portable code often means doing more work, Java developers, more than any others, should value portability in data as well as code, right? Therefore, you should give careful consideration to message types and their implications as you design applications.



Sending and receiving JMS messages

The following code snippet illustrates how to set up the JMS framework for sending and receiving messages. Notice how the code follows the steps outlined in The Java Message Service (JMS). Most applications, naturally, will only create a QueueReceiver or a QueueSender (or TopicPublisher or a TopicSubscriber for pub/sub). JMS has a transaction framework of its own, and you can specify transaction handling when creating a Session. Here, and in the example application for this section, the code specifies false, meaning no transactions are used. The Session.AUTO_ACKNOWLEDGE argument specifies automatic acknowledgement of the message when the onMessage() method completes. If the message is not acknowledged, it will be resent. You should be aware that the transactional mode can have a major impact on quality of service (QoS) and guaranteed delivery of messages. For more information about JMS transactions, see Resources.

If a QueueConnection calls start() to begin receiving incoming messages, QueueConnection.stop() should be called before exiting the application. QueueConnection.close() should always be called prior to exit to conserve resources. Pub/sub programs look very similar to the following code, the primary difference being that Topic objects are used rather than Queue objects.

import javax.jms.*;
import javax.naming.*;

public class XXX implements MessageListener
{
InitialContext ic;
Queue q;
QueueConnection qc;
QueueConnectionFactory qcf;
QueueReceiver qReceiver;
QueueSender qSender;
QueueSession qSession;
TextMessage tm;

public void setupJMS( String sQueueConnectionFactoryName,
String sQueueName )
{
try
{
// lookup the JMS driver and queue
ic = new InitialContext();
qcf = (QueueConnectionFactory)initContext.lookup(
"java:comp/env/jms/" +
sQueueConnectionFactoryName );
q = (Queue)initContext.lookup(
"java:comp/env/jms/" + sQueueName );

// create the necessary JMS objects
qc = qcf.createQueueConnection();
qSession = qc.createQueueSession(
false, Session.AUTO_ACKNOWLEDGE );
// create the sender
qSender = session.createSender( q );

// create the receiver
qReceiver = qSession.createReceiver( q );
qReceiver.setMessageListener( this );
qc.start();
}
...

} // end setupJMS

public void sendMsg( String sMsgText )
{
try
{
tm = qSession.createTextMessage( sMsgText );
qSender.send( tm );
}
...
} // end sendMsg

public void onMessage( Message msg )
{
try
{
String sMessage = ((TextMessage) msg).getText();
...
}
...
} // end onMessage

...

// if the QueueConnection issued start() for incoming
// messages, invoke stop() prior to exit.

// close the QueueConnection before exiting!!! This
// operation will also close the Session and Sender.

} // end class XXX




Using message-driven beans

After going through the JMS information, you'll probably find it surprisingly simple to code message-driven beans for receiving messages. That's because the container handles all of the steps mentioned in The Java Message Service (JMS) except actually receiving the messages. In general, the same MDB can be used to receive PTP or pub/sub messages by setting the appropriate ConnectionFactory, destination, and destination type on deployment. As is often the case with EJB technology, some of this coding ease translates into more complexity in the deployment descriptor.

An MDB must have a public, no-arg constructor, and is not allowed to throw application exceptions. In addition, an MDB must implement the following methods (for the life cycle sequence, see Message-driven beans overview ):

  • public void setMessageDrivenContext(MessageDrivenContext mdc)
    The container normally calls this method exactly once, after instantiation, to pass in the associated MessageDrivenContext.

  • public void ejbCreate()
    Called after the setMessageDrivenContext() method. This is a good time to access or obtain any resources that will be used for the life of the bean.

  • public void onMessage(Message msg)
    Called by the container when a message has arrived on the bean's associated Queue or Topic. Your code should check for the expected message types, using the instanceof operator: there's nothing to stop a client from sending any of the available message types. Other than those that deal with the Message object, no JMS methods need be invoked by the bean; it just cracks or parses the received message and performs the relevant operations.

  • public void ejbRemove()
    Called when the container intends to terminate the bean. All resources should be released at this time.

Once the message has been received, the MDB can perform all the operations itself, or act as a manager, sending the information to and controlling other beans. Queue architecture is fairly limited, because the specification envisions a queue for each bean type. It also warns against deploying multiple beans against the same queue, primarily citing message order concerns; however, there is no guaranteed order for message receipt in any event. The deployment descriptor does allow for JMS message selectors to direct specific messages to a specific bean type.




Example: The Rock Survey, take 3

After we completed take 2 of the Rock Survey (see Example: The Rock Survey, take 2 and Example: The Rock Survey, take 2 (continued) ), the application really does everything necessary to perform its task. But, for tutorial purposes, we're going to add a messaging layer. This can be a valid design under heavy loads and various other situations and design requirements. In addition, the example may give you some ideas about different approaches to processing in terms of timeliness, the client's view of the speed of an application, and loosely coupled operations.

Take 3 adds a message-driven bean ( RockSurveyMDBean ) and a modification of the existing session bean ( RockSurvey2Bean ). In the process, the application flow changes from a direct update of the datastore by the client application. Now the session bean sends a message, and the message-driven bean asynchronously manages the database update. Again, from the Web component's and end user's perspective there is no change except that the Done button operation returns more quickly.

All of the actual persistence code in the persist() method of RockSurvey2Bean, which in this version becomes RockSurvey3Bean, is removed and placed in RockSurveyMDBean. RockSurvey3Bean.persist() now gathers the data and sends a message containing an initialized RockSurveyData object (see Example: The Rock Survey, take 1 ). Most of the relevant code is very similar to the code snippet in Sending and receiving JMS messages.

First, we declare the JMS-related instance variables:

  // JMS variables for PTP 
ObjectMessage om;
Queue q;
QueueConnection qc;
QueueConnectionFactory qcf;
QueueReceiver qReceiver;
QueueSender qSender;
QueueSession qSession;
TextMessage tm;

In persist(), if the QueueConnection hasn't yet been created, the necessary messaging objects are obtained:

      if( qc == null) 
{
// lookup the administered objects
ic = new InitialContext();
qcf = (QueueConnectionFactory)ic.lookup(
"java:comp/env/jms/QueueConnectionFactory" );
q = (Queue)ic.lookup(
"java:comp/env/jms/RockSurveyQueue" );

// create the necessary JMS objects
qc = qcf.createQueueConnection();
qSession = qc.createQueueSession(
false, Session.AUTO_ACKNOWLEDGE );
// create the sender
qSender = qSession.createSender( q );
} // end if qc is null

Notice that the QueueSession is created as nontransacted with automatic acknowledgement.

At this point, the program is ready to create message objects and send messages. Beginning and ending TextMessages are sent, with the send of the ObjectMessage, which contains the survey data, sandwiched in the middle. The ObjectMessage is loaded with rsd, a RockSurveyData object.

      ...
tm = qSession.createTextMessage(
"sending RockSurveyData to RockSurveyQueue." );
qSender.send( tm );
...
om = qSession.createObjectMessage( rsd );
qSender.send( om );
...
tm.setText( "RockSurveyData sent." );
qSender.send( tm );
...

That's it for the message sending portion. To conserve resources, in ejbPassivate(), the QueueConnection is closed (this also closes the other related JMS objects) and the relevant instance variables are set to null. On termination, ejbRemove() invokes ejbPassivate() for the same purpose.

  public void ejbPassivate() 
{
if( qc != null)
{
try { qc.close(); }
catch (JMSException e) { /* can't do anything */ }
}
om = null;
q = null;
qc = null;
qcf = null;
qReceiver = null;
qSender = null;
qSession = null;
tm = null;
} // end ejbPassivate




Example, take 3: The message-driven bean

In the message-driven bean ( RockSurveyMDBean ), the persistence-related instance variables and the persist(), doCommit(), and doRollback() methods are transferred wholesale from RockSurvey2Bean (see Example: The Rock Survey, take 2 and Example: The Rock Survey, take 2 (continued) ). Since MDBs don't have a SessionContext, the MessageDrivenContext is used instead to get the UserTransaction object:

    ut = mdc.getUserTransaction(); 

And now, what you've all been waiting for: the MDB's onMessage() method:

  public void onMessage( Message msg ) 
{
try
{
if( msg instanceof TextMessage )
{
TextMessage tm = (TextMessage)( msg );
System.out.println( "Message received: " +
tm.getText() );
return;
}

if( msg instanceof ObjectMessage )
{
System.out.println( "RockSurveyData " +
"ObjectMessage received. " );
ObjectMessage om = (ObjectMessage)( msg );
rsd = (RockSurveyData)om.getObject();

if( persist() )
{
System.out.println( "Table data persisted, " );
}
else
{
System.out.println( "Problem persisting data." );
}
return;
}
else
{
System.out.println( "Unknown Message type: " +
msg.getClass().getName() );
}
}
catch( JMSException jmse )
{
System.out.println( "JMSException: " +
jmse.getMessage() );
jmse.printStackTrace();
}
} // end onMessage

onMessage is prepared to handle TextMessage and ObjectMessage type messages. If it receives a TextMessage, the text is extracted and printed. For ObjectMessages, the contained object is retrieved into a RockSurveyData. Then the persist() method is invoked to update the database and the result is printed. Any other message types are noted and then ignored.




Deploying the Rock Survey example, take 3

This time, instead of creating everything from scratch, we'll use a copy of the EAR from the take 2 version and modify it to meet this version's needs. Follow the directions carefully.

  1. Compile Java source files in the Survey3 directory, or copy the .class files from the prod folder.
  2. Start J2EE and deploytool.
  3. Ensure that the Cloudscape database is started.
  4. Create the destination queue:
    • From the command line, key and execute: j2eeadmin -addJmsDestination jms/RockSurveyQueue queue
  5. Create new application:
    • There is a copy of Survey2App.ear from the Survey2 Directory in the Survey3 directory. Rename it to Survey3App.ear.
    • From the menu, select File, Open.
    • Browse to Survey3 folder.
    • Select Survey3App.ear.
    • Click Open Object.
    • Application Displays as Survey2App1.
    • Under the General tab, change the Application Display Name to Survey3App.
    • Select Survey2WAR. Under the General tab, change the WAR Display Name to Survey3WAR.
    • Click Edit, then browse to the Survey3 directory.
    • Select the Survey3 files:
      • The images directory
      • index.jsp
      • RockSurvey.jsp
      • RockSurveyExit.jsp
      • SurveyWelcome.jsp
    • Click Add. Select "Yes to All" for the overwrite warning.
    • Click OK.
    • Select Survey2JAR. Under the General tab, change the JAR Display Name to Survey3JAR.
    • Click Edit, browse to the Survey3 directory.
    • Select the all of the .class files in Survey3.
    • Click Add. Select "Yes to All" for the overwrite warning.
    • Click OK.
    • Select RockSurvey2Bean. From the Edit menu, select Delete. Answer Yes to "Are You Sure?"
    • Click Edit. Under Contents of Survey3JAR, Select RockSurvey2Bean.class, then click Remove. Answer Yes to "Are You Sure?" Click OK.
    • Select SurveyNamesBean. Under the General tab, change the Enterprise Bean Name to SurveyNames3Bean.
    • Select SurveyBean. Under the General tab, change the Enterprise Bean Name to Survey3Bean.
  6. Create new Enterprise JavaBean component -- RockSurveyMDBean:
    • Ensure that Application Survey3App is selected.
    • From the menu, select File, New, Enterprise Bean -- Intro display appears, click Next.
    • Select Add to Existing JAR File.
    • Select Survey3JAR in the drop-down menu. The RockSurveyMDBean.class file was added above. Click Next.
    • Under Bean Type, click Message-Driven.
    • In the Enterprise Bean Class combo box, select the bean implementation: RockSurveyMDBean.
    • In Enterprise Bean Name, accept RockSurveyMDBean.
    • Click Next, accept Bean Managed Transaction Management.
    • Click Next.
    • For Destination Type, select Queue.
    • For Destination, select jms/RockSurveyQueue.
    • For Connection Factory, select jms/QueueConnectionFactory.
    • Click Next; click Finish.
    • RockSurveyMDBean now appears under Survey3JAR. Select the EJB Refs tab; click Add.
    • In the Coded Name column, enter ejb/SurveyNamesBean.
    • Select Entity for Type and Local for Interfaces.
    • For Home Interface, enter SurveyNamesLocalHome.
    • In the Local/Remote Interface column, enter SurveyNamesLocal.
    • At the bottom, select "ejb-jar-ic.jar#SurveyNames3Bean" from the Enterprise Bean Name drop-down menu.
    • Click Add.
    • In the Coded Name column, enter ejb/SurveyBean.
    • Select Entity for Type and Local for Interfaces.
    • For Home Interface, enter SurveyLocalHome.
    • In the Local/Remote Interface column, enter SurveyLocal.
    • At the bottom, select "ejb-jar-ic.jar#Survey3Bean" from the Enterprise Bean Name drop-down menu.
    • Select Survey3JAR, then click the JNDI Names tab.
    • Under EJBs, verify that jms/RockSurveyQueue is entered next to ejb/RockSurveyMDBean.
  7. Create new Enterprise JavaBean component -- RockSurvey3Bean:
    • Ensure that Application Survey3App is selected.
    • From the menu, select File, New, Enterprise Bean -- Intro display appears; click Next.
    • Select Add to Existing JAR File.
    • Select Survey3JAR in the drop-down menu. The .class file was added above. Click Next.
    • Under Bean Type, click Session and Stateful.
    • In the Enterprise Bean Class combo box, select the bean implementation: RockSurvey3Bean.
    • In Enterprise Bean Name, accept RockSurvey3Bean.
    • Select the corresponding interfaces in the combo boxes:
      • For Remote Home Interface, select: RockSurveyRemoteHome.
      • For Remote Interface, select: RockSurveyRemote.
    • Click Next; click Finish.
    • RockSurvey3Bean now appears under Survey3JAR.
    • Select Resource Refs tab; click Add.
    • In the Coded Name field, enter jms/QueueConnectionFactory.
    • For Type, select javax.jms.QueueConnectionFactory.
    • Accept Authentication as Container and check Sharable.
    • In JNDI Name textfield, enter jms/QueueConnectionFactory.
    • Select Resource Env. Refs tab; click Add.
    • In the Coded Name field, enter jms/RockSurveyQueue.
    • For Type, select javax.jms.Queue.
    • In JNDI Name textfield, enter jms/RockSurveyQueue.
    • Select Survey3JAR, then click the JNDI Names tab.
    • Under EJBs, key ejb/RockSurvey3Bean next to RockSurvey3Bean.
    • Verify all other JNDI Name entries.
  8. Modify the Web component:
    • Select Survey3WAR, then select the EJB Refs tab and select the "ejb/RockSurveyBean" row.
    • At the bottom, select "ejb-jar-ic.jar#RockSurvey3Bean" from the Enterprise Bean Name drop-down menu. Then click the JNDI Name button and select ejb/RockSurvey3Bean from the drop-down menu.
    • Now select Survey3App and click on the JNDI Names tab.
    • Verify that all JNDI Name columns have the proper entries.
  9. Deploy the Survey3App:
    • Select Survey3App.
    • Select Tools, Deploy from the menu.
    • Under Object To Deploy, ensure that Survey3App is selected.
    • Under Target Server, we are using localhost. Be sure that is selected.
    • Select the Return Client JAR check box.
    • Select Save object before deploying.
    • Click Next.
    • Verify that the JNDI names are correct, then click Next.
    • Key "/Survey3App" in the context root.
    • Click Next, then Finish.

The Deployment Progress dialog will display. Wait until the progress bars are complete and the Cancel button changes to OK, then click OK. The Survey3App application is deployed.

If you look in the Survey3 folder, you will see that it now contains Survey3App.ear and Survey3AppClient.jar. To run the Survey3App, start both J2EE and Cloudscape, then start up your browser and enter http://localhost:8000/Alice as the target URL. When the Alice's World home page appears, click on the "Alice's Surveys - Take 3" link.

No comments: