IBM Extreme Transaction Processing (XTP) patterns: Fast and scalable asynchronous processing with WebSphere eXtreme Scale

Modern enterprise systems are being confronted with the need for more complex processing, coupled with higher loads and greater demand for resources. This reality is driving a need for separation of functional units and motivating the use of asynchronous methods to accomplish work and aggregate results. This article describes an example that brings the power and scalability of IBM® WebSphere® eXtreme Scale to work on the problem, providing nearly limitless flexibility and expandability to the asynchronous paradigm. This content is part of the IBM WebSphere Developer Technical Journal.

Robert Wisniewski, Technical Evangelist, IBM

Author photoRobert Wisniewski is a Software Engineer specializing in performance and scalability. He has previously worked on WebSphere Applications Server performance for 7 years focusing on all areas of the product from EJB/JPA to autonomic computing and benchmark design. His current position as Technical Evangelist refocuses this experience on customer scenarios and the application of XTP strategies in the real world.



26 May 2010

Also available in Chinese Japanese

Introduction

Even the most casual observers of technology must be aware of the migration toward systems and services that can perform increasingly more complex tasks with more data sources than ever before, and all with an interface that's even easier to use. Mash-up is the term that’s been coined to express this concept. For example, providing comprehensive and detailed street maps is good. Adding current traffic data to the maps is even better. And including the closest place to get lunch with user ratings? Now that’s cool.

Whether it's something for the average consumer or within the massive grids of a financial trading system, the trend is obvious. Making more decisions based on more data sources more quickly is becoming vital to cutting-edge systems. Luckily, we're starting to learn exactly what it takes to get these kinds of problems solved, and one of the best tools for this evolution is a move from synchronous, linear processing to more responsive asynchronous processing. These asynchronous patterns are especially useful for integrating information from disparate and possibly external data sources. This asynchrony also provides perceived responsiveness for users, as well as best case overall throughput for the system as a whole. By delegating tasks to asynchronous requests within a scalable processing infrastructure, more optimal and efficient system utilization can be achieved.

When depending upon outside sources of information, it is important to decouple the separate application tiers and link them together more loosely, solving the scaling problem individually for each tier. This is not only an easier problem to solve, but also, in the presence of delay or downtime from the data sources, the client isn't left with an unresponsive or non-functioning mess. This decoupling adds the ability to offload, retry, and prioritize work tasks as needed.

The pitfalls of synchronous processing in a scale-up environment are related to the exhaustion of resources spent waiting on non-responsive or temporarily overloaded components of the infrastructure. In a synchronous scenario, a slowly responding external resource or fully utilized connection pool will tie up the upstream processing threads until the bottleneck is alleviated. This will continue up stream all the way to the client experience, as each sequential resource is tied up waiting on the bottleneck.

With an asynchronous model, the queue itself acts as a relief valve. The slow component is isolated behind the queue, and the biggest effect is requests building up in the queue, which will eventually catch up. Even if it never does, only the services dependent on the slow resource will be affected. This decoupling has an effect on the overall system that will prevent bottlenecks during scale-up simply due to the fact that each resource is able to act independently.


WebSphere eXtreme Scale and asynchronous processing

Although IBM WebSphere eXtreme Scale is most widely and commonly used as an elastic grid, there are key features of WebSphere eXtreme Scale that enable it to be a prime and suitable candidate to help solve the asynchronous problem. One feature is its ability to deploy a processing agent into a grid and direct the grid to invoke this "agent" on each entity sent into a particular map. We have used that capability and generalized it into a set of functions that essentially turn a POJO entity into a "message." You provide the code for the agent, which is business logic that you would like executed asynchronously.

This framework is the WebSphere eXtreme Scale asynchronous service framework, which is a library that you can utilize with WebSphere eXtreme Scale in your applications to give you the ability to asynchronously process work in a reliable and scalable manner. Because this framework is in fact a library or application using standard WebSphere eXtreme Scale capabilities to perform this function (as any WebSphere eXtreme Scale deployment would), we will refer to this framework as a sample. However, the intention of the sample is to hide some of the details and provide an intuitive interface to deploy this asynchronous processing without a lot of hassle.

There are several benefits to using WebSphere eXtreme Scale over the established frameworks (such as JMS and other messaging infrastructures) for asynchronous processing:

  • Simplicity: WebSphere eXtreme Scale and the asynchronous service framework can be deployed as a few JAR files in the classpath of a Java™ SE environment.
  • Scalability: The processing of the asynchronous work happens on the grid nodes themselves. This could be the same JVM that issued the request, or -- even better -- it could be one of thousands of grid partitions deployed. The flexibility of WebSphere eXtreme Scale is such that if you need the processing to happen faster, you need only deploy more JVMs to the grid processing the messages. This is what we characterize as elasticity, making the asynchronous service framework an elastic queue.
  • Reliability: Just as WebSphere eXtreme Scale replicates and handles application objects, the same function is applied on the asynchronous messages within the grid. You can eliminate single points of failure with the processing system given an adequately deployed and replicated grid.

Most of the benefits above can be attained in some form with other asynchronous frameworks, but usually with greater expense and a steeper learning curve.

Figure 1. WebSphere eXtreme Scale asynchronous service framework architecture
Figure 1. WebSphere eXtreme Scale asynchronous service framework architecture

An example

Let's implement a simple "Hello World" application to illustrate the simplicity of the WebSphere eXtreme Scale asynchronous service framework. This example will dispatch a message into the grid, identify which grid partition has been assigned to process the message, and return that information back to the dispatcher. While this is a simple example, you'll see the core concepts in action, which you can apply to a wide variety of applications.

Figure 2. Developer's view of the framework
Figure 2. Developer's view of the framework

The message POJO

First, you need to define the message itself, as well as the business logic to be processed. Do this by creating a POJO that implements the "serializable" interface. This interface indicates that the object can be sent as a parameter with a remote method call, which is the basic requirement of any object being inserted into a WebSphere eXtreme Scale grid.

Listing 1. Sample message object
package com.ibm.websphere.asyncservice.test;

import java.io.Serializable;

public class TestMessage implements Serializable {


  private static final long serialVersionUID = 7894711850787375307L;

  public int id;

  public TestMessage(int id){
    this.id = id;
  }

}

This object serves as the data container for the work that needs to be done upon receipt of the message in the grid. In this case, you’ll stick to a single Integer field which will be used as a unique identifier and enable you to track the progress of a message through the grid.

When such an instance is inserted into the grid, the grid partition that receives the message will execute the logic in the message listener using the data in the instance via a local pool of threads devoted only to processing these requests. We'll call this pool a processing unit, with one pool hosted in each shard. Configuration of the processing unit can be achieved with some use of the Spring framework (to be covered in an upcoming article). By default, each processing unit uses three threads, and since this is replicated for each partition, it would likely be ideal for many types of applications.

The listener (MessageProcessor)

To implement the message processing logic, you need only create a class that implements the com.ibm.websphere.objectgrid.asyncservice.MessageProcessor interface, which only requires the implementation of a method called onMessage. This method is passed a unique message ID, a reference to the grid where the message was sent, as well as the message itself as an object. The code for this is shown in Listing 2.

Listing 2. Sample message processor
package com.ibm.websphere.asyncservice.test;

import java.io.Serializable;

import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.asyncservice.MessageProcessor;

public class TestProcessor implements MessageProcessor<Integer> {

  public Integer onMessage(Session session, String msgId, Serializable msg){

    int partitionId = session.getObjectGrid().getMap("Queue").getPartitionId();
    TestMessage testMsg = (TestMessage) msg;
    System.out.println("SHARD: Message " + testMsg.id + " processed by partition " 
      + partitionId);
    return partitionId;

  }

}

This example returns an Integer object that identifies the processing partition's ID to the process/thread that initially sent the message. The asynchronous service framework will take this value and fulfill the Future object that was returned to the client so that any client processes interested in this value will then be able to consume and use the validation information.

Future is a data type introduced in the Java 5 concurrency framework, and is specifically meant to be "filled" later, with methods provided to check if the value is currently assigned, or even block and wait until the value is assigned. The eventual return type is not specified by any of the asynchronous service interfaces or implementation classes, leaving the return type your choice, which can be any serializable data type.

One particular detail to note is that there is a single instance of a MessageProcessor for each processing unit partition. The asynchronous service framework ensures this by utilizing WebSphere eXtreme Scale's shard event listener interfaces. This instance is used for all messages that are sent into the partition, making members of this class thread safe and providing the opportunity to cache required resources. You would do that here, for example, if the message processor needs to make a datasource lookup or other expensive resource connection. By keeping it as a member field in this class, you can safely cache these expensive resources.

The client

Let's take a look at what you would do to send one of these messages into the grid and process the result.

Listing 3. Asynchronous framework client example
AsyncServiceManager<Integer> aservice = new AsyncServiceManagerImpl<Integer>(clientGrid);

TestMessage msg10 = new TestMessage(10);
Future<Integer> response = aservice.sendAsyncMessage(msg10);

System.out.println("{CLIENT: Message " + i + " was successfuly processed by partition " 
  + responses.get(i).get() + ".}" );

Apart from the brief code segment in Listing 3, there is also a bit of code needed to create the connection to the grid represented by the clientGrid object, which will be covered next when we discuss WebSphere eXtreme Scale deployment details.

Once the message has been sent, the sender can store a reference to the Future object and check periodically for completion, or it can block and wait if needed by calling the get() method. The code in Listing 3 shows a blocking call, in which the client waits until the messsage has been processed to report which grid partition was chosen to consume the message.

The source code and deployment files for this example are included with this article for download. In addition, you’ll find links for obtaining the asynchronous service framework as well as a fully functional trial version of WebSphere eXtreme Scale in the Resources section.

The grid

The included sample grid can be started and deployed in two ways. The first is a simple in-process catalog service and container, which is convenient for development environments and quick testing. For the second, scripts are provided to deploy standalone grid containers which might be deployed across a cluster of machines. This would be the standard type of deployment for a production or performance testing environment.

  • In-process grid

    Deploying and running the sample with an in-process grid is the easiest way to see these concepts in action. A script is provided to run the client. When this script is run with no arguments, the grid is automatically deployed in-process. No other changes are necessary. If you've imported the code into Eclipse or a similar IDE, simply run the main method from the com.ibm.websphere.asyncservice.test.TestClient class to accomplish the same function.

  • Remote grid

    Start by launching a catalog service, which is described in the scripts included with the sample. You can find these scripts in the bin directory of the sample code and need only change the JAVA_HOME variable in env.sh to reflect the path to your desired JVM, and the directory where the WebSphere eXtreme Scale product code is located.

    To start the catalog service, simply execute the script runcat.sh

    Some deployment information is required for the grid JVMs themselves, but they need little or no customization to work in nearly any environment. The names of the maps and grids are all used by the asynchronous service framework itself and have no need to change. This information is stored in the deployment.xml and objectgrid.xml files in the sample, and you can use them to start your grid JVMs. All of these configuration details are in the provided scripts, so to start a container JVM, run the command: runcontainer.sh <your unique server name>.

    With your server processes started, you can connect to and use the grid by retrieving your reference to the WebSphere eXtreme Scale grid in the client with the code shown in Listing 4.

Listing 4. Grid client example
// Connect to the Catalog Server.  The security and client override XML are not specified
ClientClusterContext ccc = ObjectGridManagerFactory.getObjectGridManager()
                                                      .connect(“localhost”, null, null);

// Retrieve the ObjectGrid client connection and return it.
ObjectGrid grid = ObjectGridManagerFactory.getObjectGridManager()
                                             .getObjectGrid(ccc, “Grid”);

Based on the standard WebSphere eXtreme Scale development pattern, you first get a context object from the catalog service (here deployed on localhost, which should be changed or parameterized for a remote catalog service). Then, you get a reference to the grid, which is subsequently provided to the AsyncServiceManagerImpl object above to send the asynchronous messages.

The runclient.sh script can also be used to run the sample against a remote grid by providing the hostname and port to the processing unit grid's catalog service as a single parameter. For example, execute the command runclient.sh localhost:2809 to run the client against a grid deployed on the local machine.

Runtime

The purpose here was to maximize scalability and reliability. From a scalability perspective, you only need to start additional JVMs to participate in the grid, which will automatically start servicing asynchronous messaging requests. More capacity is as simple as starting a new JVM with the same command line above, save for a new unique server ID, easily providing additional processing resources.

Reliability is already handled within the grid. As configured, each message is replicated once in another grid JVM and will be processed there if a primary partition goes down. As with any typical WebSphere eXtreme Scale grid instance, the replication behavior is configurable in the deployment XML. In the sample as packaged, the grid is configured to synchronously perform a single replication of the message if another container is available. This service is an assured deliver-once mechanism with reliability only limited by the number of JVMs available to which to replicate.

An important detail to mention is that "deliver once" is within the context of your message listener implementation. The asynchronous service framework will deliver and commit the message in the grid ONLY once, however because of failure windows it is possible that the actual logic in the listener might be executed more than once. This means that if the listener logic changes any state in an external data source (database, mainframe, and so on) then these interactions must be treated with an "at least once" delivery paradigm. To be clear, the onMessage method will be processed in its entirety only once, but it might be attempted multiple times and, as such, portions of the method might be executed multiple times. For example, if a container JVM fails while processing a message, the onMessage method will be executed again on the new primary partition once it is promoted. Any code that might have had an effect on outside resources must be aware of this possibility.


Transactional considerations

This framework, as well as WebSphere eXtreme Scale on which it is based, does not formally participate in JTA transactions (WebSphere eXtreme Scale has its own transaction manager). However, the standard "nesting" technique can be used to make the processing of a message a logical part of a JTA transaction. That is, your listener (MessageProcessor) can be written to perform the following logic in this order, insuring that each step has succeeded prior to beginning the next:

  1. Begin a JTA transaction.
  2. Process a message.
  3. Mark the message as processed (or delete it from the queue).
  4. Commit the JTA transaction.

If any step fails (for example, you fail to successfully mark the message as processed), you can rollback the JTA transaction and that message can be validly processed again. The above logic makes the act of processing a message a logical participant in the JTA transaction.


Conclusion

This article illustrated the WebSphere eXtreme Scale asynchronous service framework that is available as a sample for deployment in your environment. The example shown here demonstrated this new capability of WebSphere eXtreme Scale servers to help solve some of the more troubling and challenging growth issues in an ever-changing transaction processing environment. Asynchronous archetypes and the scalable environments they create will be essential in many evolving systems.

The ansynchronous service framework is available as a sample in the WebSphere eXtreme Scale samples gallery. This repository of patterns and examples is one of our most valuable channels of communication to help users get the most from these new technologies.

Resources

Learn

Get products and technologies

Discuss

  • Engage with the development community as well as the eXtreme Scale product team! Get your questions answered at the WebSphere XD forum

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into WebSphere on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=WebSphere
ArticleID=492188
ArticleTitle=IBM Extreme Transaction Processing (XTP) patterns: Fast and scalable asynchronous processing with WebSphere eXtreme Scale
publish-date=05262010