Processing complex business events with WebSphere eXtreme Scale, Part 1: Reducing the load on WebSphere Business Events

This two-part series shows how IBM® WebSphere® eXtreme Scale can be used to improve performance and scalability when processing complex business events. Part 1 introduces a traffic monitoring scenario and uses ObjectGrid on a stand-alone server to show how the scenario works. eXtreme Scale is used to pass high-volume raw events to a prefiltering application, which significantly reduces the processing load on Business Events. Part 2 builds on this scenario and uses a distributed ObjectGrid to partition the events based on the Business Events context ID. This introduces high availability and scalability, as events can be processed by multiple Business Events instances.

Daniel McGinnes (mcginnes@uk.ibm.com), Performance Specialist, IBM

Photo of Daniel McGinnesDaniel McGinnes is a Performance Specialist at IBM in the UK Lab at Hursley. He works on measuring and tuning performance of several WebSphere products, including WebSphere Application Server, WebSphere Enterprise Service Bus, and WebSphere Business Events. He previously worked as an IT specialist providing consultancy to IBM customers across EMEA and USA.



29 October 2008

Also available in Japanese

Introduction

WebSphere Business Events (hereafter referred to as Business Events) helps businesses detect, evaluate, and respond to the impact of business events based on the discovery of actionable event patterns. It enables business users to define and manage business events so that they can take timely, proactive actions. It also reduces the total cost of ownership (TCO) through codeless implementations by business users, often without incurring IT development or implementation costs. However, processing these complex events can be a CPU-intensive operation that requires a highly scalable solution.

This two-part series shows how WebSphere eXtreme Scale (hereafter referred to as eXtreme Scale) can be used to improve the performance and scalability in situations that require complex processing of large volumes of events. Part 1 uses ObjectGrid on a stand-alone server to process events from an RFID-based traffic monitoring system. A prefiltering application in eXtreme Scale significantly reduces the number of events that need to be processed by Business Events. Part 2 builds on this scenario and uses a distributed ObjectGrid to partition the events based on a context ID. This allows events to be processed by multiple Business Events instances, improving both scalability and availability.

The traffic alerter scenario

Our example scenario is a traffic management system based on Radio Frequency Identification (RFID) tags in motor vehicles. In this potential use of RFID technology, vehicles contain an RFID tag. RFID sensors are located every half mile along motorways. Every time a vehicle passes an RFID sensor an event is sent to our event processing system, which can calculate the speed of every vehicle at each location along the road. If the system detects a pattern of slow-moving traffic along a section of road it can send alerts to highway patrol to investigate, or automatically put alerts on overhead signs. This scenario could involve a large number of events being sent to our event processing system. The event processing system requires some complex event processing, as we do not want to send an alert if just a single slow-moving vehicle is detected (the vehicle may have malfunctioned or simply stopped). We need to check for multiple occurrences of slow vehicles in a short period of time. Further, we must take into account accident locations that the Highways Agency is already aware of.

Figure 1. Traffic alerter scenario
Figure 1. Traffic Alerter Scenario

Solution architecture

Two major requirements govern the design of our solution:

  • The ability to process a large volume of events
  • The ability to deal with complex event processing

With Business Events V6.1 you can define complex event processing logic with a minimum amount of development time. Remember that the events being sent into our event processing system do not contain the speed of the vehicles, only the time they pass a location. The event processing system would need to store the time each RFID was at a specific location, and then calculate the speed from the next event. Business Events is designed to process complex events, not to store and process large amounts of data. Therefore it makes sense to have a layer of application in front of Business Events to pre-process the events, calculate the speed at each location, and then forward the events to Business Events.

eXtreme Scale V6.1 has a component called the ObjectGrid. The ObjectGrid provides a way to store large amounts of data in memory, and to access that data very efficiently. It also provides partitioning and replication of the data, which allows improved scalability and high availability. For these reasons we can use the ObjectGrid to store the time each RFID passes a location, and so quickly calculate the speed at each location when an event is received. We could then forward these speeds to Business Events to do the event processing. However, although we have eliminated the need for Business Events to calculate the speeds at each location, we would still be sending a large volume of events; possibly more events than a single instance of Business Events could process. Recall that with Business Events V6.1 we cannot easily cluster the Business Events instances.

Here is a recap of our design constraints:

  • We have a large number of events that consist of a location and a speed.
  • We need to filter these events to detect events with speeds less than a threshold occurring in a short period of time.
  • We need to check if the event is at a location where a problem is already known.

In analysing the scenario, we would expect that the vast majority of events will have speeds above the threshold. If an event has a speed above the threshold then we are not interested in it anymore, and no further processing is required. But when a speed is below the threshold we need to perform the complex event processing to check whether it is indeed an incident, and not just an anomaly or a known problem. The check of whether the speed is above or below the threshold is a very simple one and does not require any complex event processing. As a result, we can perform this check in our eXtreme Scale application and avoid having to process the vast majority of events with Business Events. This should greatly improve the performance of our event processing solution.

One additional decision is how to perform the initial filtering in the eXtreme Scale application. We could use a simple conditional if expression. This option would be efficient, however a change to the threshold would impact the whole application. It would also mean that a thread processing the incoming events would be tied up constructing and sending an event to Business Events if that was required. eXtreme Scale provides an efficient method for passing data between applications using a FIFO queue. See Create the application and filter for more information. This provides us with an efficient way to keep our application and filter separate. Passing work items to our filter also enables us to partition the data and take advantage of ObjectGrid clustering. Part 2 of this series shows how to modify the application to use a distributed ObjectGrid for improved scalability and high availability. Figure 2 shows the solution architecture for part 1.

Figure 2. Traffic alerter solution architecture
Figure 2. Traffic Alerter Solution Architecture

As shown, Business Events and eXtreme Scale are running in separate WebSphere Application Server Network Deployment (hereafter referred to as Application Server) instances. You can deploy both Business Events and eXtreme Scale in the same Application Server instance. However, to follow best practices, we decided to run them in separate Application Server instances. Keeping the application separate from the event processing parts of the solution offers two advantages:

  • Both Application Server and Business Events could be used for different applications within the total IT solution; keeping them separate avoids potential problems with one server affecting the other.
  • The servers can be run on separate hardware, taking advantage of extra CPU power.

As mentioned earlier, part 2 shows how to use ObjectGrid to partition requests based on the location. Each partition then forwards requests to a different Business Events server. Because each event location will always be in the same partition, and so get sent to the same Business Events instance, this gives us a method of scaling up the number of Business Events instances we can use to process complex events.

Install and configure the products

The following software levels were used to create and run the application:

  • IBM Rational Application Developer V7.0.0.2 (hereafter referred to as Application Developer).
  • WebSphere Business Events V6.1 with PK66053 applied. Installing Business Events also installs WebSphere Application Server V6.1.0.15.
  • IBM DB2 V9.5.
  • WebSphere eXtreme Scale V6.1.0.2 with PK67016 applied. (The prerequisite for this is an existing WebSphere Application Server installation. We installed a stand-alone V6.1.0.17 Application Server.)

Install WebSphere Business Events V6.1

During the installation we used the "Default install set", as the default settings are suitable for our requirements. You are asked what you wish to use as a database server. For production environments you are recommended to use IBM DB2 V9.5 as your database server. See Resources for installation instructions for Business Events V6.1.

Install WebSphere eXtreme Scale V6.1

To install eXtreme Scale:

  1. Install a stand-alone V6.1 Application Server environment.
  2. Locate the latest fix pack for Application Server and install it. We used Fix Pack 6.1.0.17.
  3. Install eXtreme Scale V6.1 into the location where you installed Application Server V6.1. We did not install the Centralized Installation Manager when asked if we wanted to do so.
  4. Locate the latest fix pack available for eXtreme Scale and apply it to your eXtreme Scale installation. We installed Fix Pack 2 (V6.1.0.2) and APAR PK67016. See Resources for a link to installation instructions and fix packs for Application Server and eXtreme Scale.

At this point you should have installed both Business Events V6.1 and eXtreme Scale V6.1 at the latest maintenance level. You can examine the samples and tutorials for each product to familiarise yourself with the capabilities of both products.

Create the complex event processing logic

Rather than guide you step-by-step through creating the event processing logic, we provide you with a Business Events XML file (TAProject.xml) supplied in the TrafficAlerterExecutables.zip file. This section walks you through the main parts of the event processing logic.

Create intermediate objects and touchpoints

Business Events enables real-time event coordination between business systems (referred to as touchpoints). In this scenario, the HighWays Agency and the eXtreme Scale server are touchpoints. Touchpoints use intermediate objects, which are representations of business objects, to share data.

Start the Business Events Design Data UI and import the TAProject.xml file. The Intermediate Object section contains one object called SpeedLoc. This object has two fields: location and speed. The location and speed are derived from the event. Figure 3 shows the SpeedLoc intermediate object.

Figure 3. SpeedLoc intermediate object
Figure 3. SpeedLoc Intermediate Object

There are three touchpoints defined with five actions and five events (Figure 4).

Figure 4. Touchpoints
Figure 4. Touchpoints

Examine the WXSServer touchpoint. The SpeedEvent defines where we receive events from our eXtreme Scale prefilter. The Connection is defined as Connector, as the eXtreme Scale filter writes directly to the Business Events Event topic.

The TestFinished event and the AllEventsProcessed action are used by our test application so that it knows when all events have been processed by Business Events. (See Test Finished processing.) The AllEventsProcessed action uses a Message Queue Connection to write an action to a Java Message Service (JMS) queue on the eXtreme Scale server. You need to edit the action properties so that the Provider URL matches your eXtreme Scale server.

To edit the action properties:

  1. Right-click AllEventsProcessed, and select Action Properties.
  2. Click the Connection tab. On the Connection tab click Configure.
  3. Click Provider... to modify the URL. The port number should be the BOOTSTRAP_ADDRESS port of your eXtreme Scale server (Figure 5).
Figure 5. Modifying the provider URL
Figure 5. Modifying the Provider URL

On the HighwaysAgency touchpoint, note the KnownAccident event and action, and the AccidentCleared event and action. Recall that we want to take into account accident locations that the Highways Agency already knows about. These events provide a way for the Highways Agency to tell us about known accidents, and when the accidents have been cleared. The LowSpeedAlert action (under HighwaysAgency) is triggered if we believe there is slow traffic at a location. This would probably be a Web service call or an email in real life, but for ease of testing we defined this as a file system connection. You may need to edit the properties of this connector so that it matches your file system.

The LowSpeedEvent and LowSpeedAction (under InternalTouchPoint) are explained further when we describe the filters.

Create filters and interaction sets

This section describes the filters and interaction sets that make up our complex event processing logic. In Business Events, a filter is a piece of business logic. An interaction set is a group of related filters that handle a common event or action. The interaction set implements the business logic.

Close the Design Data UI, and open the TAProject.xml file in the Design UI. To display definitions for filters or interaction sets, expand Filters or InteractionSets in the project hierarchy, and select the desired item.

The SpeedBelow40 filter checks if the speed is below 40. Even though we intend to prefilter events and perform this check in eXtreme Scale we still check for this in Business Events. If we didn't also perform this check here then we would risk sending alerts that were incorrect in the case of a malfunction or error in the prefilter application. This could be referred to as a "belt and braces" approach.

Figure 6. SpeedBelow40 filter
Figure 6. SpeedBelow40 Filter

The Not known problem filter ensures that the event isn't a known problem. It checks for occurrences of the KnownAccidentEvent. If there have been any of these events this filter checks that the event hasn't been cancelled by an AccidentClearedEvent.

Figure 7. Not known problem filter
Figure 7. Not known problem Filter

The MultipleLowSpeeds filter checks for at least nine occurrences of the LowSpeed action in the space of a minute. It also checks that there haven't been any occurrences of the LowSpeedAlert in the last five minutes. This means that only one LowSpeedAlert is sent every five minutes. This prevents flooding the HighwaysAgency with alerts when there is a problem.

Figure 8. MultipleLowSpeeds filter
Figure 8. MultipleLowSpeeds Filter

The AccidentCleared and KnownAccident interaction sets are used to allow the Highways Agency to notify us of known accidents, and to tell us when they are clear. We need these interaction sets because our Not known problem filter checks for the number of occurrences of these events. To apply this filter we must reference the events in an interaction set.

Figure 9. AccidentCleared and KnownAccident interaction sets
Figure 9. AccidentCleared and KnownAccident Interaction Sets

The CheckForLowSpeed interaction set is driven by a SpeedEvent from the WXSServer touchpoint. The filter checks whether the speed is below 40. If the request passes this filter then a LowSpeed action is fired on our InternalTouchPoint.

Figure 10. CheckForLowSpeed interaction set
Figure 10. CheckForLowSpeed Interaction Set

The ManyLowSpeeds interaction set checks for multiple occurrences of the LowSpeed action on our InternalTouchPoint. It also checks that we don't have a known problem at this location. If both these filters pass then it will send the LowSpeedAlert action to the HighwaysAgency. The fact that we defined a context relationship of SpeedLoc.location means that the interaction set and filters will be evaluated on a per-location basis.

Figure 11. ManyLowSpeeds interaction set
Figure 11. ManyLowSpeeds Interaction Set

The TestFinished interaction set determines when Business Events has processed all the events. See Test Finished processing for more information.

Figure 12. TestFinished interaction set
Figure 12. TestFinished Interaction Set

Create the application and filter

We use Application Developer V7.0 to create the TrafficAlerter application. We assume you are familiar with using Application Developer to create J2EE applications. Import the Project Interchange file TrafficAlerterPI.zip (supplied with this article) into Application Developer. This section guides you through the main parts of the application. After you have imported the ZIP file, you should have the following projects:

Table 1. Traffic Alerter projects in the Application Developer workspace
Project nameTypeDescription
TrafficAlerterEnterprise ApplicationThe TrafficAlerter application.
TrafficAlerterAppClientApplication ClientContains the EJB client application that drives a workload to test our event processing system.
TrafficAlerterClientUtility JarContains EJB Home and Remote interfaces and stubs.
TrafficAlerterEJBEJBContains the EJB that receives requests, calculates speeds, and puts speeds on the ObjectGrid for the filter to process.
TrafficAlerterFilterEnterprise ApplicationThe TrafficAlerterFilter application.
TrafficAlerterFilterWebWebContains the filter application that reads the speeds of the ObjectGrid queue, checks for the threshold, and forwards events to Business Events if required.
TrafficAlerterUtilUtility JarContains classes used by both applications, such as the types that are written to the ObjectGrid.

When developing ObjectGrid applications in Application Developer you must add the ObjectGrid libraries to the project build path. Notice that both the TrafficAlerterEJB and TrafficAlerterFilterWeb projects reference a variable called OBJECT_GRID_JAR. Do the following:

  1. Copy the wsobjectgrid.jar file from the lib directory where you installed eXtreme Scale (for example, C:\Program Files\IBM\WebSphere\AppServer\lib) to your development environment. We copied it to the C:\WXSLib directory, but the location isn't important, as long as Application Developer has access to the file.
  2. Right-click the TrafficAlerterEJB project and select Build Path => Configure Build Path.
  3. Click on the Libraries tab, then select OBJECT_GRID_JAR and click Edit.
  4. Now click Variable, and then click Edit, to modify the location of OBJECT_GRID_JAR to the location of wsobjectgrid.jar file.
  5. Click OK several times, and then rebuild your project.

Several APIs can be used to write ObjectGrid applications. We chose to use the EntityManager API, as it provides a simple and intuitive programming model for interacting with the ObjectGrid.

The ObjectGrid descriptor XML file describes the ObjectGrid. It references an entity.xml file that describes the objects that will be stored in the grid. In our case this file is very simple, but it can also be used to define various transaction and access settings. Our ObjectGrid descriptor file is called objectGrid.xml and is located in the WebContent/META-INF folder in the TrafficAlerterFilterWeb project.

Listing 1. The ObjectGrid descriptor XML file (objectGrid.xml)
<?xml version="1.0" encoding="UTF-8"?>
<objectGridConfig xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://ibm.com/ws/objectgrid/config ../objectGrid.xsd"
	xmlns="http://ibm.com/ws/objectgrid/config">
   <objectGrids 
     <objectGrid name="TrafficAlerterOG" entityMetadataXMLFile="entity.xml">
        <bean id="ObjectGridEventListener" 
          className="com.ibm.ta.startup.beans.TAOGListener" /"> 
        <backingMap name="DataPacket"/>   
        <backingMap name="SpeedAtLocation"/>
     </objectGrid>
   </objectGrids>
</objectGridConfig>

Notice that we specify an ObjectGridEventListener class. This means the TAOGListener class will get notified when the ObjectGrid is activated. This is how our filter application starts. This is covered in more detail when we describe our filter application. The referenced entity.xml file (Listing 2) defines the objects that will be stored in the ObjectGrid. The DataPacket and SpeedAtLocation classes are in the TrafficAlerterUtil project.

Listing 2. The entity schema descriptor XML file (entity.xml)
<?xml version="1.0" encoding="UTF-8"?">
<entity-mappings xmlns="http://ibm.com/ws/projector/config/emd"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://ibm.com/ws/projector/config/emd ./emd.xsd""> 
  <description">"Entity Mapping description"</description">
  <entity class-name="com.ibm.ta.util.DataPacket" name="DataPacket" access="FIELD"">
    <description">"This is the DataPacket class"</description">
    <attributes">
    <id name="RFID" /">
    <basic name="locationId" /">
    <basic name="timeInMillis" /">
    </attributes">
  </entity">
  <entity class-name="com.ibm.ta.util.SpeedAtLocation" name="SpeedAtLocation" 
      access="FIELD"">
    <description">"Speed at a location"</description">
    <attributes">
    <id name="Id" /">
    <basic name="locationId" /">
    <basic name="speed" /">
    </attributes">
  </entity">	
</entity-mappings">

Because we use a distributed ObjectGrid we also need an objectGridDeployment.xml file. Even though part 1 runs a stand-alone eXtreme Scale server we use a distributed ObjectGrid rather than a local one so that it can be easily modified for part 2. Part 2 shows the real value in eXtreme Scale in scaling up across multiple JVMs. The objectGridDeployment.xml file specifies the number of partitions and replicas required for our ObjectGrid. In this file, we specify numberOfPartitions=1, meaning we have only one partition. Listing 3 shows the contents of the objectGridDeployment.xml file.

Listing 3. The objectGridDeployment.xml file
<?xml version="1.0" encoding="UTF-8"?">
<deploymentPolicy xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://ibm.com/ws/objectgrid/deploymentPolicy 
 ../deploymentPolicy.xsd"
 xmlns="http://ibm.com/ws/objectgrid/deploymentPolicy">
    <objectgridDeployment objectgridName="TrafficAlerterOG">
    	<!-- Change 1 to at least the number of JVMs being started -->
        <mapSet name="mapSet" numberOfPartitions="1" minSyncReplicas="0" 
        maxSyncReplicas="0" maxAsyncReplicas="0" numInitialContainers="1">
            <map ref="DataPacket" />
            <map ref="SpeedAtLocation"/>
        </mapSet>
    </objectgridDeployment>
</deploymentPolicy>

In a J2EE application the name and location of both the objectGrid.xml and objectGridDeployment.xml files are important. The XML files must have these exact names and be placed in the META-INF folder of a J2EE module. If the ObjectGrid finds these files it registers the application server as an ObjectGrid container JVM with the catalog service. As a result, the same application can be deployed in different clusters and still be treated as a single grid by the catalog service.

The TrafficAlerterBean in the TrafficAlerterEJB project is a stateless session bean. It has one remote method sendDataPacket, which is called by clients when they detect an RFID passing a sensor. To simplify testing we use an EJB client application to simulate multiple sensors, but this method could easily be externalised as a Web service.

In the ejbCreate method we call a factory method OGFactory.getObjectGrid to obtain a reference to the ObjectGrid. Listing 4 shows the OGFactory class.

Listing 4. The OGFactory class
  public class OGFactory {
    private static ObjectGrid og;
    private static ObjectGridManager ogManager;
	
    public static synchronized ObjectGrid getObjectGrid() {
      if (og == null) {initialiseOG();}
      return og;
    }
    private static synchronized void initialiseOG() {
      ogManager = ObjectGridManagerFactory.getObjectGridManager();
      try {
        ClientClusterContext context = ogManager.connect(null, null);
        og = ogManager.getObjectGrid(context,TrafficAlerterConstants.TA_OG_NAME);
      } catch (ObjectGridException e) {
        System.out.println("Error occurred connecting to ObjectGrid: " +
        e.printStackTrace();
      }
    }
  }

The TA_OG_NAME is TrafficAlerterOG and therefore points to the ObjectGrid name defined in our ObjectGrid configuration file. We use this factory class to ensure all session bean instances get the same reference to the ObjectGrid. If each session bean instance obtained its own reference to the ObjectGrid we found we could get intermittent com.ibm.websphere.objectgrid.plugins.OptimisticCollisionException exceptions.

The sendDataPacket method is shown in Listing 5. This method receives a data packet (containing the RFID, the location, and the time) from a client. It then calculates the speed for this RFID, and stores the location and speed in the ObjectGrid for our filter application to read and process.

Listing 5. The sendDataPacket method
  public void sendDataPacket(DataPacket data) {
1   EntityManager em = getEntityManager(); 
    em.getTransaction().begin() 
    try{
      DataPacket currentData = (DataPacket) em.find(DataPacket.class,data.RFID);
      if (currentData == null) {
2       currentData = new DataPacket();
        currentData.locationId = data.locationId;
        currentData.RFID = data.RFID;
        currentData.timeInMillis = data.timeInMillis;
        em.persist(currentData);
        em.getTransaction().commit();
      } else {
3       BigDecimal diff = new BigDecimal(data.timeInMillis - currentData.timeInMillis);
        BigDecimal timeInHours = diff.divide(millisInHour,10, RoundingMode.HALF_UP);
        BigDecimal speedBig = halfMile.divide(timeInHours, 0, RoundingMode.HALF_UP);
        int speed = speedBig.intValue();
				
4       currentData.locationId = data.locationId;
        currentData.timeInMillis = data.timeInMillis;
        em.persist(currentData);
        em.getTransaction().commit();
				

	
5       em.getTransaction().begin();
        SpeedAtLocation sp = new SpeedAtLocation();
        sp.locationId = data.locationId;
        sp.speed = speed;
        sp.Id = UUID.randomUUID().toString();
        boolean persisted = false;
6       while (! persisted){ 
          try {
            em.persist(sp);
            em.getTransaction().commit();
            persisted = true;
          } catch (EntityExistsException eee) {
            sp.Id = UUID.randomUUID().toString();
          }
        }
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

The sendDataPacket method is described below. (See the numbered references in Listing 5.)

  1. We need a reference to the EntityManager to interact with the ObjectGrid.
  2. If no entry with this RFID is found in the ObjectGrid, then we just add this entry, then return.
  3. Here we calculate the speed based on the time in this DataPacket, and the time stored at the previous location.
  4. Now we update the location and time values that were stored in the ObjectGrid for this RFID, and commit them.
  5. Here we set up the SpeedAtLocation object that we will write to the ObjectGrid so that our filter can process it.
  6. The ID we generate should be unique, but just in case we get a duplicate, we loop round until the transaction commits.

This explains how we calculate the speeds and store them in the ObjectGrid; next we describe how the TrafficAlerterFilter application works.

The TrafficAlerterFilter application is in the TrafficAlerterFilterWeb project. This project contains the TAOGListener class referenced in Listing 1. This class implements the ObjectGridEventListener, which means the shardActivated method will be called when ObjectGrid initializes, and the shardDeactivate method will be called when ObjectGrid is deactivated. These methods are shown in Listing 6.

Listing 6. The shardActivated and shardDeactivate methods (TAOGListener class)
  public void shardActivated(ObjectGrid arg0) {
    InitialContext ic;
    try {
      ic = new InitialContext();
      WorkManager wm = (WorkManager)ic.lookup("java:comp/env/wm/WorkManager");
      for (int i = 0; i < TrafficAlerterConstants.NUM_FILTER_THREADS; i++) {
        Work taFilterWork = new TAFilterWorker(arg0);
        wm.startWork(taFilterWork);
        allWork[i] = taFilterWork;
    } catch (Exception e1) {
      e.printStackTrace();
    }		
  }
	
  public void shardDeactivate(ObjectGrid arg0) {
    try {
      for (int i = 0; i < TrafficAlerterConstants.NUM_FILTER_THREADS; i++) {
        Work taFilterWork = allWork[i];
        taFilterWork.release();
    } catch (Exception e1) {
      e.printStackTrace();
    }	
  }

Rather than directly listening to the ObjectGrid, the shardActivated and shardDeactivate methods use the Application Server asynchronous beans WorkManager API. The asynchronous beans extension to Application Server allows you to perform work on a different thread. Notice that the shardActivated method starts a number of TAFilterWorker threads. This constant is set to 3 by default. We found that we needed multiple threads to process all the work if a large percentage of requests ended up being sent to Business Events.

The TAFilterWorker class is our asynchronous bean that loops round listening for objects to process on the ObjectGrid. It implements the com.ibm.websphere.asynchbeans.Work interface, which means it can be scheduled to run using the WorkManager. The run method that loops looking for work on the ObjectGrid is shown in Listing 7.

Listing 7. The TAFilterWorker.run method
   public void run() {
     Session session = getSession();
     EntityManager em = session.getEntityManager();
     try {
1    ObjectMap map = session.getMap("SpeedAtLocation");
2    while (!appStopping.get()) {
       em.getTransaction().begin();
3      Object msgKey = map.getNextKey(2000);
4      if (msgKey != null) {
5        SpeedAtLocation sp = (SpeedAtLocation) em.find(SpeedAtLocation.class, msgKey); 
6        if (sp.locationId.equals("-1")) { 
           sendTestFinishedEvent();
         } else {
7          if (sp.speed < TrafficAlerterConstants.TA_MIN_SPEED) {
             sendEventToWBE(sp);
           }
         }
8        map.remove(msgKey); 
       }
       em.getTransaction().commit();
     }
     } catch (Exception e) {
       e.printStackTrace();
       System.out.println("Error occurred: " + e.getMessage());
     }
  }

Test Finished processing

The TrafficAlerterEJB has another remote method called testFinished. testFinished is called by the client when it has sent all the test events. This method writes a SpeedAtLocation object with a location of -1 and speed of -1 in a similar way to the sendDataPacket method. The filter application then checks for this special location. If this is set to -1 then it sends a TestFinished event to Business Events. When Business Events receives this event it triggers an AllEventsFinished action which writes a message to a JMS queue. The client application listens on this queue. When it receives the message it knows that all events have passed through the filter and been processed by Business Events. We added this processing because we wanted to be able to measure the throughput of the application. To do this we needed to know when Business Events had finished processing all the events.

The run method is described below. (See the numbered references in Listing 7.)

  1. We get the map containing SpeedAtLocation objects.
  2. We use an AtomicBoolean to loop on. If ObjectGrid is stopped, the shardDeactivate method calls the release method on this class, which sets appStopping to true.
  3. This retrieves an entry from the map. It blocks for 2000 milliseconds waiting for an entry if one isn't already there.
  4. If no entry is found in the map in 2000 ms, then getNextKey returns null. If this is the case we commit the transaction and restart the loop.
  5. We found an entry in the map, so we use the key retrieved to get the SpeedAtLocation object.
  6. We check if we need to send a TestFinished event to Business Events. See Test Finished processing.
  7. This is where we check the speed against the threshold and do the filtering. If we are below the threshold we send an event to Business Events. Otherwise we do nothing.
  8. Now that this entry has been checked we remove it from the map.

The run method calls the sendEventToWBE method (Listing 8). This method calls two additional methods that compose the XML to send and then send the message.

Listing 8. The sendEventToWBE method
  private void sendEventToWBE(SpeedAtLocation sal) {
    String xmlToSend = composeXMLEvent(sal.speed, sal.locationId);
    publishEvent(xmlToSend);
  }

Listing 9 shows the composeXMLEvent method.

Listing 9. The composeXMLEvent method
  private String composeXMLEvent(long speed, String location) {
    StringBuilder xml = new StringBuilder();
    xml.append("<connector name=\"WXSServer\" version=\"2.2\"><
      connector-bundle name=\"SpeedEvent\" type=\"event\"><connector-object 
      name=\"SpeedEventObject\"><field name=\"speed\" type=\"Integer\">");
    xml.append(speed);
    xml.append("</field><field name=\"location\" type=\"String\">");
    xml.append(location);
    xml.append("</field></connector-object></connector-bundle>");
    xml.append("<loginfo>Low Speed detected</loginfo></connector>");
    return xml.toString();
  }

The format of the XML event that Business Events expects can be determined from the Business Events Design Data UI.

  1. Under Touchpoints expand WXSServer, and right-click SpeedEvent. Select Event Properties, then click on the Event tab (Figure 13).
  2. From here you can export the Business Events packet schema and example.
Figure 13. Exporting Business Events packet schema and example
Figure 13. Exporting Business Events packet schema and example

The publishEvent method, and the recreateJMSProducer that it calls, are shown in Listing 10. Some exception handling has been removed for brevity.

Listing 10. The publishEvent method
  private void publishEvent(String xmlEvent) { 
    int retries = 0;
    while (retries < 3) {
      try {
        if (session == null) {
          recreateJMSProducer();
        }
        msg = session.createTextMessage(xmlEvent);
        msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
        producer.send(msg);
        return;
      } catch (JMSException e) {
        System.out.println("JMSException occurred: " + e.getMessage());
        recreateJMSProducer();
        retries++;
      } 
    } 
    System.out.println("Publish of Event to WBE failed");
  }

  private void recreateJMSProducer() {
    if (ctx == null) {
      ctx = new InitialContext();
    }
    if (cf1 == null) {
      cf1 = (ConnectionFactory) ctx.lookup("jms/WbeTopicConnectionFactory");
    }
    connection = cf1.createConnection();
    session = connection.createSession(false,javax.jms.Session.AUTO_ACKNOWLEDGE);
    if (dest == null) {
      dest = (Topic) ctx.lookup("jms/eventTopic");
    }
    producer = session.createProducer(dest);
  }

The methods are fairly standard JMS code for publishing a message to a topic. We try to reuse the JMS session for efficiency, but if this fails we recreate the JMS resources. Remember that the topic actually resides on the Business Events server, not the eXtreme Scale server we are running on, so we had to create local JMS Resources that point to the remote ones on the Business Events server. See Deploy and test for details.

Deploy and test

Now that the development is complete, the next step is to install and test the application. Follow these steps:

  1. Export three modules and a properties file from Application Developer.
    • Export the TrafficAlerter application to the TrafficAlerter.ear file.
    • Export the TrafficAlerterFilter application to the TrafficAlerterFilter.ear file.
    • Export the TrafficAlerterUtil project to the TrafficAlerterUtil.jar file.
    • Export the trafficAlerter.props file, which can be found in the TrafficAlerterAppClient project.
  2. Next, add the TrafficAlerterUtil.jar file to the classpath of the eXtreme Scale server. The ObjectGrid is shared between two different applications in the same application server. This means we have to ensure objects stored and accessed in the grid are loaded by the same classloader. If the TrafficAlerterUtil.jar file was packaged with each application, then the classes would be loaded by different classloaders. In the eXtreme Scale administrative console navigate to Application servers => server1 => Process Definition => Java Virtual Machine and add the TrafficAlerterUtil.jar file to the classpath.
  3. The server uses the default initial and maximum heap sizes. Increase both these settings to 1024 to allow the server more heap.
  4. As mentioned earlier, we need to create two JMS resources on the eXtreme Scale server that point to the Business Events topic and topic connection factory. We also need to create a JMS queue and queue connection factory that are used for the Test Finished processing. Use the supplied Jython script createJMSResources.py to create the resources listed in Table 2. To run the script navigate to the bin directory of your eXtreme Scale installation location. Type
    wsadmin -lang jython -f createJMSResources.py WXS_EndpointWBE_Endpoint.
    WXS_Endpoint is the host and SIB_ENDPOINT_ADDRESS port of eXtreme Scale, and WBE_Endpoint is the SIB_ENDPOINT_ADDRESS of Business Events (for example localhost:7280).
Table 2. JMS resources created
JMS Resources
NameTypeDescription
WXSBusService Integration BusBus that contains the JMS queue used for the Test Finished processing. The eXtreme Scale server is the only bus member.
TEST_FINISHEDSIB DestinationQueue destination created on WXSBus used for the Test Finished processing.
AllEventsProcessedJMS QueueJMS queue resource that references the TEST_FINISHED SIB destination.
AllEventsProcessedQCFJMS Queue Connection FactoryJMS queue connection factory resource used to access the AllEventsProcessed queue.
WbeTopicConnectionFactoryJMS Topic Connection FactoryJMS topic connection factory used by our filter application to talk to the Event topic on Business Events. The Provider endpoint is set to the SIB_ENDPOINT_ADDRESS of Business Events.
eventTopicJMS TopicJMS topic used by our filter application to talk to the Event topic on Business Events.
  1. Use the administrative console to install the TrafficAlerter.ear and TrafficAlerterFilter.ear applications.
  2. Disable the debug logging in the Business Events connectors. Debug logging significantly affects the performance of connectors. We changed this logging value from DEBUG to ERROR by starting the Business Events properties interface, and navigating to the Connectors section of the Full Configurator. We changed the value of the as.director.connectors.log4j.logger.com.aptsoft.connectors property to ERROR. See Figure 14.
    Figure 14. Changing logging value to ERROR
    Figure 14.Changing logging value to ERROR
  3. Before running any tests, restart eXtreme Scale, Business Events, and the Business Events connectors.

Test using the EJB client

The TrafficAlerter client is packaged in the TrafficAlerterAppClient project in the TrafficAlerter application. It is a J2EE application client module that can be run using the launchClient tool provided with Application Server. The client simulates a number of sensors sending DataPacket objects to the filter application. It is not intended to be a realistic client application; its main purpose is to send a large number of requests into our TrafficAlerter application to test the performance.

To run the client:

  1. Ensure that the launchClient is in your PATH.
  2. Type
    launchclient TrafficAlerter.ear -CCclasspath=TrafficAlerterUtil.jar trafficAlerter.props

The property file trafficAlerter.props defines a number of properties used by the client to determine what it will run. These properties are described in Table 3.

Table 3. Properties used by TrafficAlerterClient
NameDescription
providerURLThis is the URL used to look up the TrafficAlerter EJB. It should include the hostname and BOOTSTRAP_ADDRESS port of the eXtreme Scale server.
ejbURLThe JNDI name of the TrafficAlerter EJB. You should not normally need to change this from the default.
locationsThe number of locations that each client thread will iterate over.
numRFIDsThe number of RFIDs that each client thread will simulate. This should be set to a multiple of WBERatio.
WBERatioThis determines how many of the requests will get sent to Business Events. If this is set to 0 then all requests will be dealt with by the filter. If this is set to 1, then every request will get forwarded to Business Events. If this is set to 2 then every other request will be sent to Business Events, and the others will be handled by the filter. If this is set to 100 then 1 in every 100 requests will be sent to Business Events. This is related to the numRFIDs property, so numRFIDs should be set to a multiple of WBERatio.
threadsThe number of client threads to run. Each thread will run with the same number of locations and RFIDs.
durationThe time in seconds to run the test. Set this to 0 if you want to just run for the specified number of RFIDs and locations.

Test results

To test the effectiveness of our filter we ran two sets of measurements. For the first set we ran the normal application and used the filter to filter out requests. For the second set, we removed the filtering logic from the filter application so that every single request would be sent to Business Events. Requests still flow through the filter application, but all events are sent to Business Events. This means Business Events performs the simple check to see if speeds are less than 40. This allowed us to see the effectiveness of filtering in our eXtreme Scale application, as opposed to sending all requests to Business Events. We used a number of different WBERatios. This parameter affects the number of events that can be filtered using the simple filter, and the number that require complex processing. The results are shown in Figure 15.

Figure 15. Results with and without the prefilter application
Figure 15. Results

The results were obtained running the client, eXtreme Scale server, Business Events server and DB2 on the same Windows® machine. This may not be the optimal configuration, but it was the simplest to set up. In each case we used enough threads to push the machine CPU to near 100%. The results show that the greater the percentage of events that can be filtered by the eXtreme Scale prefilter, the greater the benefit of using the eXtreme Scale prefilter. If all requests require the complex processing (that is, WBERatio = 1) then there is no benefit to using the eXtreme Scale prefilter.

Summary

This article demonstrates how a simple filtering application in eXtreme Scale can improve performance in situations that require complex processing of a large volume of events by Business Events. Part 2 of this series demonstrates how this application can be extended to take advantage of ObjectGrid partitioning when you need to increase the number of eXtreme Scale and Business Events instances beyond a stand-alone server. Figure 16 illustrates this solution.

Figure 16. Solution architecture for part 2
Figure 15. Solution Architecture for part 2

Downloads

DescriptionNameSize
Project Interchange file including source codeTrafficAlerterPI.zip53KB
EAR files and Business Events project fileTrafficAlerterExecutables.zip70KB
Jython wsadmin scriptcreateJMSResources.zip3KB

Resources

Learn

Get products and technologies

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Business process management on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Business process management, WebSphere
ArticleID=346383
ArticleTitle=Processing complex business events with WebSphere eXtreme Scale, Part 1: Reducing the load on WebSphere Business Events
publish-date=10292008