Event stream processing with WebSphere eXtreme Scale

IBM® WebSphere® eXtreme Scale exposes a rich set of APIs that enable access to data residing within a distributed, resilient, and high performance cache. These APIs support a wide range of application programming patterns. One such pattern enables repeated cache updates to be treated as a temporal sequence of events. Applications written to monitor these event sequences are able to correlate and aggregate patterns of lower level events in order to derive real-time insight into business-significant situations. This article describes the implementation of a simple WebSphere eXtreme Scale-based scenario that illustrates the real-time processing of heterogeneous event streams. This content is part of the IBM WebSphere Developer Technical Journal.

Share:

Dr. Alan Hopkins (hopkinsa@uk.ibm.com), Senior IT Specialist, IBM

Alan Hopkins photoAlan is a Consulting IT Specialist with IBM Software Group Services for WebSphere with more than 20 years experience in middleware and internet-related technologies. For much of his career he has focused on Transactional Middleware systems, and their usage in the e-Business space. Most recently Alan has been dedicated to the strategically important areas of Business Process Management/Business Activity Monitoring and Extreme Transaction Processing. At the present time Alan is a member of the Worldwide Technical Practice, who form part of IBM Software Services for WebSphere, based at IBM's Hursley Park Laboratory in England.



03 September 2008

Introduction

A common characteristic of modern enterprise architectures is the inclusion of components procured from different sources, based upon non-consistent foundation technologies, and implemented at different points in time. Not surprisingly, the resultant infrastructure is often too complex to completely understand at an IT level, let alone at a higher level business abstraction.

The occurrence of activities in these disparate subsystems represent the inner workings of today’s modern enterprise, or beyond. As a result of completely independent threads of execution, these activities will frequently occur in isolation from each other. Collectively, these heterogeneous events represent an often untapped source of insight into business threats and opportunities.

The use of the term event might be confusing at first. The term is used both to describe an activity that occurred, and something that represents an activity that occurred in a computer system. When discussing event processing, it is common to allow the overloading of the term to continue. Hopefully the context within which the term is used will be sufficient to avoid ambiguity.

A common approach to externalizing the occurrence of a significant activity within a computer system is via the emission of an event to describe the activity. The physical representation of the event is of lesser interest at this point. What is important is that activities occurring continuously within these heterogeneous subsystems can be made available for analysis from a cross-subsystem perspective. This set of heterogeneous events is sometimes referred to as an event cloud.

IBM WebSphere eXtreme Scale provides the functionality required to capture and perform real-time analysis of the event cloud. This article presents the implementation of a simple stock portfolio stop loss capability for a Web application. This implementation leverages some of the key capabilities of WebSphere eXtreme Scale that facilitate the real-time processing of heterogeneous event streams.


Scenario: Stock portfolio stop loss

This example scenario provides a simple stock portfolio stop loss capability. In this scenario, client stock portfolios encapsulate any number of stocks that might be traded in either US dollars or UK pounds. Portfolio valuations are expressed in UK pounds, and are recalculated dynamically each time an incoming stock ticker event results in a change to a given portfolio stock holding.

Figure 1. Stock portfolio stop loss scenario
Figure 1. Stock portfolio stop loss scenario

If the aggregate valuation of a portfolio changes by more than 10% on a given trading day, an alert is raised. In this example, the alert is implemented as an informational message written to the system log. (In a more realistic implementation, this alert could result in a warning rendered on a business monitoring dashboard, or an automated trade to sell some or all of the monitored stock portfolio.)


Essential aspects of WebSphere eXtreme Scale

WebSphere eXtreme Scale provides a runtime platform into which applications with ultra-demanding performance, scalability, and availability requirements can be deployed. A complete discussion of the all the capabilities provided by this technology is beyond the scope of this article, but brief descriptions of some of the capabilities leveraged in this solution are presented below. This information provides a contextual background to help you understand the content of our example scenario. (See Resources for a complete description of all WebSphere eXtreme Scale capabilities.)

  • In-memory cache

    The central capability provided by WebSphere eXtreme Scale is an in-memory cache that can be partitioned across large numbers of operating system processes that reside on disparate machines. This distributed cache can be configured to provide enterprise class qualities of service such as performance, scalability, and resiliency. From an administrative perspective, the high-level container for this in-memory cache is the ObjectGrid. Within an ObjectGrid instance, data is encapsulated within one or more maps. Application programs can read/write data from/to these maps in the form of data objects, identified by a unique key. Data integrity is assured by the inherent transactional capabilities of WebSphere eXtreme Scale.

    In order to respond to discrete events in the incoming stock/foreign exchange event streams in our example, you will make use of an ObjectGrid event listener plug-in. Because this type of plug-in is configured with a scope of an ObjectGrid instance, you will use two configured ObjectGrid instances in the present scenario. This lets you achieve isolation with respect to incoming events. Since the data residing within this cache is held entirely within memory, a key benefit of using WebSphere eXtreme Scale is the speed at which operations can be performed against cached data.

  • Programming models

    WebSphere eXtreme Scale exposes a rich set of APIs that collectively support the access of cached data via a number of application programming patterns. In this scenario, you will exploit two of these patterns:

    • In the first instance, you will use a simple map API to write entries to the cache. The WebSphere eXtreme Scale Map API is similar to the Java™ Collections Map API in that you essentially write entries identified by a unique key to a map object.
    • In addition, you will also use the WebSphere eXtreme Scale Stream Query API to handle data updates representing continually changing stock and foreign currency exchange rates. This enables updates to a WebSphere eXtreme Scale map to be treated as a temporal event stream. This capability is provided as an extension to the core map capability. Support for event stream handling introduces the concept of a stream map and a view map. A stream map is used to capture one or more incoming raw event streams. The precise nature of the extraction and aggregation of data elements from the raw data stream that are used to populate the view map is expressed using Stream Query Language, an SQL-like language that contains specific extensions to facilitate the explicit processing of temporal event streams. Processing of this raw event stream is handled by the Stream Processing Technology (SPT) engine. SPT engine output is written to the corresponding stream view map.

    Figure 2 shows a pictorial representation of the processing flow through these WebSphere eXtreme Scale components.

    Figure 2. Event stream processing with WebSphere eXtreme Scale
    Figure 2. Event stream processing with WebSphere eXtreme Scale
  • ObjectGrid event listener

    WebSphere eXtreme Scale is architected to provide pluggable extension points through which user code can be incorporated. The ObjectGrid event listener extension point enable the incorporation of code that is invoked when significant ObjectGrid lifecycle events occur. The sample scenario leverages this capability to introduce code that will be executed every time a transaction is committed against one of the two ObjectGrids that are used here. This pluggable extension point provides the mechanism through which you will incorporate code that dynamically recalculates aggregate portfolio valuations every time you receive a stock ticker event.


Implementation

Essential elements of this sample scenario are described below. The development of this scenario was performed with these software product versions:

  • IBM WebSphere Integration Developer Version 6.1 (Java perspective)
  • IBM WebSphere eXtreme Scale Version 6.1
  • IBM Java Runtime Environment Version 1.5.0

Although the underlying Java runtime environment in the development scenario was J2SE™, this application should run equally well in an IBM WebSphere Application Server-based environment.

WebSphere eXtreme Scale configuration

Configure WebSphere eXtreme Scale using two complementary configuration files. The first, objectGrid.xml (Listing 1) contains the definition of the two ObjectGrids that are used in this scenario:

  • StocksGrid is used to encapsulate incoming event streams that contain foreign exchange and stock ticker events. Within this ObjectGrid you define four backing maps:

    • Incoming raw stock and foreign exchange event streams are written to stockStreamMap and forexStreamMap, respectively.
    • View data derived from these two initial maps by the STP engine, subject to the specification of the corresponding streams and views, is written to stockViewMap and forexViewMap.

    Specification of the data object attributes that are relevant to the scenario can be found within the stream definitions. The time-based aggregation of data from these streams is provided via the view definitions.

    One further point to note in the configuration data below is the specification of the ObjectGridEventListener plug-in on the StocksGrid ObjectGrid instance.

  • CustomerPortfolioGrid is used to contain the details of individual customer portfolios. This ObjectGrid instance contains two backing map instances: StockOwners enables you to determine the owners of any given stock, whilst Portfolios enables you to retrieve the details of all monitored Customer portfolios.

Listing 1. Configuration file objectGrid.xml
<objectGrids>
  <objectGrid name="StocksGrid">
    <bean id="ObjectGridEventListener" className="wxs.plugins.ObjectGridEventListener"/>
    <backingMap name="stockStreamMap" streamRef="stockStream"/>
    <backingMap name="forexStreamMap" streamRef="forexStream"/>            
    <backingMap name="stockViewMap" viewRef="stockView"/>            
    <backingMap name="forexViewMap" viewRef="forexView"/>

    <streamQuerySet name="stockMonitoringSQS">
        <stream name="stockStream" 
     		valueClass="wxs.streamquery.example.StockQuote" 
     		sql="create stream stockStream keyed by t (price DECIMAL (9,2), 
                  tickerSymbol VARCHAR(100));"
           	access="FIELD" >
        </stream>
        <stream name="forexStream" 
         	valueClass="wxs.streamquery.example.ForexQuote" 
          	sql="create stream forexStream keyed by t (rate DECIMAL (9,4), 
                    type VARCHAR(100) );"
               access="FIELD" >
        </stream>
        <view name="stockView"
          sql="CREATE VIEW stockView AS SELECT tickerSymbol, avg(price) as avgPrice FROM 
               (SELECT * FROM stockStream FETCH LATEST 5 MINUTES) group by tickerSymbol"
          access="FIELD">
        </view>
        <view name="forexView"                    
              sql="CREATE VIEW forexView AS SELECT type, avg(rate) as avgRate FROM 
                   SELECT * FROM forexStream FETCH LATEST 5 MINUTES) group by type"
              access="FIELD">
        </view>                 
    </streamQuerySet>
</objectGrid>
        
<objectGrid name="CustomerPortfolioGrid">
     <backingMap name="StockOwners" readOnly="false" lockStrategy="PESSIMISTIC"/>
     <backingMap name="Portfolios" readOnly="false" />      	
   <ObjectGrid>      	
</objectGrids>

A secondary configuration file, objectGridDeployment.xml, defines characteristics of your cache topology. The essential elements of this topology configuration is presented in Listing 2, which shows that each ObjectGrid instance is made up of a single partition.

Listing 2. Configuration file objectGridDeployment.xml
<objectgridDeployment objectgridName="StocksGrid">
     <mapSet name="mapSet1" numberOfPartitions="1"
         minSyncReplicas="0" maxSyncReplicas="0"
         maxAsyncReplicas="0" numInitialContainers="1">
         <map ref="stockStreamMap"/>   
   	  <map ref="forexStreamMap"/>   
	  <map ref="forexViewMap"/>     
	  <map ref="stockViewMap"/>
     </mapSet>
</objectgridDeployment>

<objectgridDeployment objectgridName="CustomerPortfolioGrid">
     <mapSet name="mapSet1" numberOfPartitions="1"
         minSyncReplicas="0" maxSyncReplicas="0"
         maxAsyncReplicas="0" numInitialContainers="1">
         <map ref="StockOwners"/>   
         <map ref="Portfolios"/>
     </mapSet>
</objectgridDeployment>

Cached object data model

Define two sets of POJOs to encapsulate the data that is cached within your ObjectGrid instances:

  • The first set, used to hold state data describing customer, stock portfolios, and stock holdings, is defined during scenario initialization and remains static throughout the scenario lifetime (Figure 3).
    Figure 3. Static data model
    Figure 3. Static data model
  • The second set of data objects represent stock and currency exchange rate events that make up the event streams consumed by this scenario (Figure 4).
    Figure 4. Event stream data model
    Figure 4. Event stream data model

Code artifacts

This implementation involves the Java code artifacts listed in Table 1.

Table 1. Code artifacts
Class nameDescription
wxs.streamquery.example.StockOwnersPOJO that encapsulates set of portfolio IDs that own a given stock.
wxs.streamquery.example.PortfolioPOJO that encapsulates a customer portfolio.
wxs.streamquery.example.StockQuotePOJO that encapsulates a stock ticker event.
wxs.streamquery.example.ForexQuotePOJO that encapsulates an exchange rate ticker event.
wxs.streamquery.example.StockHoldingPOJO that encapsulates the portfolio holding of a single stock.
Wxs.streamquery.example.StatusFlagPOJO used to denote that scenario is initializing.
wxs.streamquery.example.ScenarioInitCommand line Java program that initializes the stop loss scenario.
wxs.streamquery.emitter.ForexEventStreamEmitterCommand line Java program used to emit a stream of exchange rate ticker events.
wxs.streamquery.emitter.StockEventStreamEmitterCommand line Java program used to emit a stream of stock ticker events.
wxs.streamquery.emitter.SingleStockEventEmitterCommand line Java program used to emit a single stock ticker event.
wxs.plugins.ObjectGridEventListenerListener program that is invoked for every occurrence of a state change to a given ObjectGrid instance. In this scenario, you use this program to react to committed changes to the StocksGrid ObjectGrid instance.
wxs.utility.CustomerPortfolioGridHelperUtility program that manages the acquisition of a handle to the CustomerPortfolioGrid ObjectGrid singleton instance.
wxs.utility.StocksGridHelperUtility program that manages the acquisition of a handle to the StocksGrid ObjectGrid singleton instance.

The next sections take a closer look at some of these artifacts. Because the intent here is to present an overview of the most significant aspects of these code artifacts, exhaustive descriptions of all the Java programs that make up this scenario are not included here. However, all code artifacts described in the table above can be found in the download file that accompanies this article.


Scenario initialization

Initialization of the scenario is encapsulated within ScenarioInit, a command line program responsible for creating and writing instances of the StockOwners and Portfolio POJOs to the CustomerPortfolioGrid ObjectGrid instance. This state information is subsequently used during the detection of stop loss situations when ObjectGridEventListener is running. Scenario initialization is also responsible for writing initial valuations for each stock used within the scenario, and the initial GB pounds to US dollars currency exchange rate to the StocksGrid ObjectGrid instance.

State data definition

If you examine ScenarioInit, you will see that the details of each stock, portfolio, and exchange rate is hardcoded at the start of the class source code.

Listing 3. Definition of stock portfolios and initial stock prices
static String[] stockList = {"AIG",  "AXP",  "ATT",  "INTC",  "MOT",  "IBM", "CSCO",
                             "JAVA", "MSFT", "ORCL", "HBOS", "HSBA",  "BAY", "BGY",
                             "BLND", "GSK", "ITV", "LAND",  "III", "BT"};

static float[] stockPrices = {49.12f, 52.40f, 24.95f, 23.56f,  10.07f, 123.29f, 26.92f,
                              13.14f, 29.43f, 21.55f, 4.71f, 8.82f, 2.09f, 7.14f,
                              8.04f, 11.53f, 0.61f, 14.67f, 8.97f, 2.33f}; 

static int[][] portfolioVolumes = {		
     	{1000, 500, 2000, 850, 4000, 2000, 7000, 2500, 5000, 9000, 2000},
{1000, 2500, 4000, 9000, 500, 900, 600, 2000},
	{8000, 900, 2500, 1000, 2000, 3000},
	{3000, 1000, 2500, 5500, 500, 2000, 7500, 1000},
	{3000, 5500, 4000, 500, 3000, 7000, 9000},
	{3500, 2500, 9000, 6500, 3000, 1000, 2500, 1000},
	{1000, 7000, 8000, 3000, 500, 3500, 5500},
	{9000, 3000, 5500, 1000, 8000, 4000, 3000, 2000, 1500},
	{2500, 500, 4000, 3000, 9000, 8000, 7000, 6000},
	{5500, 1000, 2500, 500, 3000, 8000, 500, 3000}};

static String[][] portfolioStocks = {	
{"AIG", "AXP", "ATT", "INTC", "MOT", "IBM", "CSCO", "JAVA", "MSFT", "HBOS"},
{"IBM", "CSCO", "JAVA", "MSFT", "ORCL", "BAY", "BGY", "BLND"},
{"IBM", "CSCO", "JAVA", "GSK", "ITV", "LAND"},
{"ATT", "INTC", "MOT", "IBM", "CSCO","III", "BT", "HBOS"},
{"AIG", "ATT", "INTC", "MOT", "JAVA", "ORCL","HSBA"},
{"ATT", "INTC", "MOT", "IBM", "CSCO",  "ORCL", "BAY", "BGY"},
{"AIG", "AXP", "INTC", "JAVA", "ORCL","BLND", "GSK"},
{"AIG", "AXP", "INTC", "IBM", "CSCO", "JAVA", "ORCL","ITV","LAND"},
{"AIG", "ATT", "MOT", "CSCO", "MSFT", "ORCL","III","BT"},
{"AXP", "ATT", "INTC", "IBM", "MSFT", "ORCL","GSK", "ITV"}};
   	
float initialDollarToPoundsForexRate = 1.9876f;

Stock ownership state data creation

The code fragment in Listing 4 illustrates how you can write stock ownership data to the StockOwners map within the CustomerPortfolioGrid ObjectGrid instance. For each stock referenced within the scenario, as defined within the stockList array in Listing 3, you create a StockOwners object instance with a key value set to the stock ticker symbol from stockList[]. You then construct an array of CustomerIDs that contains an entry for every portfolio that contains the stock represented by the StockOwners object instance. This array is set as the value of the customerIDs attribute, before writing the object to the StockOwners map.

After the code in Listing 4 has run, the StockOwners map contains one instance of the StockOwners object for every stock defined within the stockList[] array.

Listing 4. Population of StockOwners map with static data
portfolioGrid = CustomerPortfolioGridHelper.getOG("CustomerPortfolioGrid");
stocksGrid = StocksGridHelper.getOG("StocksGrid");
    	    		
/* -------------------------------------------------------- */
/* Get ObjectGrid Session                                   */
/* -------------------------------------------------------- */
Session portfolioSession = portfolioGrid.getSession();

/* -------------------------------------------------------- */
/* Write Stock Ownership StockOwners Map                    */
/* -------------------------------------------------------- */
ObjectMap ownersMap = portfolioSession.getMap("StockOwners");	 
        
portfolioSession.begin();        
        
// Create array of StockOwners objects
for (int i = 0; i < stockList.length; i++)
{
	ArrayList customerIDs = new ArrayList();
	StockOwners stockOwners = new StockOwners(stockList[i]);
	stockOwners.setTickerSymbol(stockList[i]);
        	
	for (int j = 0; j < portfolioStocks.length; j++)
	{
		for (int k = 0; k < ScenarioInit.portfolioStocks[j].length; k++)
    		{
    			if (ScenarioInit.portfolioStocks[j][k].equals(stockList[i]))
    			{
    				customerIDs.add(String.valueOf(j + 1));
    			}
    		}
    	}
    	stockOwners.setCustomerIDs(customerIDs);
    	ownersMap.put(stockOwners.getTickerSymbol(), stockOwners);   
}
portfolioSession.commit();

Acquisition of ObjectGrid handle

In Listing 4, the acquisition of a handle to the configured ObjectGridInstances CustomerPortfolioGrid and StocksGrid is encapsulated within two similar helper classes. Figure 5 shows the essential aspects of one of these helper classes.

Listing 5. Population of Portfolios map with static data
synchronized static public ObjectGrid getOG(String objectGrid)
{
   if (og == null)
   {			
      try
      {			
	 URL url = Thread.currentThread().getContextClassLoader().
		getResource("SQ_ObjectGrid.xml");
								
	 ClientClusterContext ccc = bjectGridManagerFactory.getObjectGridManager().
		connect("localhost:6000", null,url);
								
	 ObjectGridManager ogm = ObjectGridManagerFactory.getObjectGridManager();
	 
         og = ogm.getObjectGrid(ccc,objectGrid);
			
      }
      catch (Exception e)
      {
	  e.printStackTrace();
      }			
   }
   else
   {
      System.out.println("StocksGridHelper.getOG() - OG already initialised");
   }		
   return og;
}

Customer portfolio state data creation

You must now write the state data that defines customer stock portfolios into the Portfolios map within the CustomerPortfolioGrid ObjectGrid instance. Once again, this data is used whilst monitoring changes to the aggregate value of customer portfolios in order to provide the stop loss capability. The code fragment used to write portfolio state data into WebSphere eXtreme Scale is shown in Listing 6. The initial aggregate portfolio valuation is calculated for each portfolio, and the value is copied into the initialValuation attribute of each portfolio object instance.

When the code in Listing 6 has run successfully, one portfolio object instance has been written into the Portfolios ObjectMap for each defined customer stock portfolio.

Listing 6. Population of Portfolios map with static data
ObjectMap portfoliosMap = portfolioSession.getMap("Portfolios");
        
for (int j = 0; j < ScenarioInit.portfolioStocks.length; j++)
{
 	float portfolioValuation = 0.0f;
 	Portfolio portfolio = new Portfolio(String.valueOf(j + 1));
 	ArrayList<StockHolding> stockHoldingList = new ArrayList<StockHolding>();	
    		
 	portfolioSession.begin();     
    		
 	for (int k = 0; k < ScenarioInit.portfolioStocks[j].length; k++)
 	{
 	   int numUnits = ScenarioInit.portfolioVolumes[j][k];
 	   StockHolding stockHolding = 
           new StockHolding(ScenarioInit.portfolioStocks[j][k]);
    	   stockHolding.setNumUnits(numUnits);
    				
    	   if (isUSStock(ScenarioInit.portfolioStocks[j][k]))
    	   {
    	      stockValuation = 
                   numUnits * initialStockPrice(portfolioStocks[j][k])/
                                                  initialDollarToPoundsForexRate;
    	   }
    	   else
    	   {
    	      stockValuation = numUnits * initialStockPrice(portfolioStocks[j][k]);
    	   }
    	   stockHolding.setLatestValuation(stockValuation);
    	   stockHoldingList.add(stockHolding);
    	   portfolioValuation += stockValuation;    			
    	}
    	portfolio.setStocks(stockHoldingList);
    	portfolio.setInitialValuation(portfolioValuation);    			
    	portfoliosMap.insert(portfolio.getCustomerID(), portfolio);  
}
portfolioSession.commit();

Write initial events to event stream maps

Next, you must prime the stock and currency exchange event streams by inserting objects that represent the opening prices for each stock within the scenario into stockStreamMap, and the opening GB pound to US dollar exchange rate into forexStreamMap, respectively.

Essential elements of the code required to insert these events is presented in Listing 7.

Listing 7. Scenario initialization: Insertion of initial events
Session stockStreamSession = stocksGrid.getSession();

ObjectMap stockStreamMap = stockStreamSession.getMap("stockStreamMap");
ObjectMap stockViewMap = stockStreamSession.getViewMap("stockViewMap");
    		
stockStreamSession.begin();
        
StockQuote.QuoteCurrency currency = null;
    		
for (int i = 0; i < stockList.length; i++)
{
 	if (i < 10)
 	{
 		currency = StockQuote.QuoteCurrency.USDOLLAR;
 	}
 	else
 	{
 		currency = StockQuote.QuoteCurrency.GBPOUND;
 	}

 	stockStreamMap.insert(stockList[i], new StockQuote(stockList[i], 
      stockPrices[i], 
      currency));  
}

    		
// Forex
forexStreamMap.insert("USDOLLAR_TO_GBPOUND", new ForexQuote("USDOLLAR_TO_GBPOUND",
        initialDollarToPoundsForexRate));

stockStreamSession.commit();

ObjectGridEventListener

As described earlier, WebSphere eXtreme Scale is designed to provide a number of extension points that enable the default capability of the technology to be extended. Code modules can be specified to run at specific defined points in the overall WebSphere eXtreme Scale application lifecycle.

The ObjectGridEventListener plug-in module is provided to enable the incorporation of code that will be invoked at strategic points during transaction lifecycle management. You can leverage this capability to introduce functionality that runs each time a transaction is committed to the StocksGrid ObjectGrid instance to which all stock and foreign exchange ticker events are written. As such, the implementation of the ObjectGridEventListener is at the heart of the event-driven scenario presented here. The plug-in is responsible for processing events that represent updates to views derived from both stock and foreign exchange tickers.

Aggregate portfolio value recalculations are performed after an update to the stockView view, which is derived from the stockStream event ticker stream. These aggregate values are computed using time-averaged stock and foreign exchange values derived from their respective event streams.

The most important aspects of the endTransaction() method are presented in Listing 8. (The code shown here is condensed for simplicity, and is intended only to convey the essence of how the code works. For a full code listing, refer to the included download file.) A collection object encapsulating the changes contained within a given transactional bracket is passed in as a parameter on the method invocation. The first section of code focuses on determining the most recent time-averaged forex value. To retrieve this value, you navigate through interrelated entity API objects to eventually arrive at the current time averaged value.

Listing 8. ObjectGridEventListener: Retrieval of time-averaged forex value
public void transactionEnd(String txid, boolean isWriteThruEnabled,
   boolean committed, Collection changes) 
{

 Iterator logSequenceIterator = changes.iterator();

 while (logSequenceIterator.hasNext())
 {
    LogSequence logSequence = (LogSequence)logSequenceIterator.next();
    String mapName = logSequence.getMapName();
    Iterator forexChangeIterator = logSequence.getAllChanges();

    if ((mapName.equals("forexViewMap") && logSequence.isDirty()))
    {

	while (changeIterator.hasNext())
	{
	  try
	  {
	   LogElement logElement = (LogElement)changeIterator.next();
	   Tuple forexKeyTuple = (Tuple)(logElement.getCacheEntry().getKey());
	   String key = (String)forexKeyTuple.getAttribute(0);
	   ogSession = instrumentGrid.getSession();
	   ObjectMap forexViewMap = ogSession.getViewMap("forexViewMap");
	   EntityMetadata emd = forexViewMap.getEntityMetadata();
	   TupleMetadata keyMD = emd.getKeyMetadata();
	   Tuple forexTuple = keyMD.createTuple();
	   forexTuple.setAttribute(0, "USDOLLAR_TO_GBPOUND"); //Could use key value
	   Tuple forexValueTuple = (Tuple)forexViewMap.get(forexTuple);
	   timeAveragedForexValuation = (Float)forexValueTuple.getAttribute(0);
	}
	catch ( Exception e)
      {......}
    }
  }
 }
}

You then need to handle the case in which the transaction that just committed had resulted from an incoming stock ticker event; that is, the transaction commit that caused the invocation of your transactionEnd() method was against the stockView map. For this case, you first establish the key of the event, which is set to the stock ticker symbol, and then retrieve the current time-averaged stock price valuation for this ticker symbol.

Listing 9. ObjectGridEventListener: Retrieval of time-averaged stock values
else if (mapName.equals("stockViewMap") && logSequence.isDirty())
{
   while (changeIterator.hasNext())
   {
      try
      {
	   LogElement logElement = (LogElement)changeIterator.next();
	   LogElement.Type type = logElement.getType();	
	   Tuple keyTuple = (Tuple)logElement.getCacheEntry().getKey();
	   String key = (String)keyTuple.getAttribute(0);

	   Session instrumentSession = instrumentGrid.getSession();
	   instrumentSession.beginNoWriteThrough();
	   ObjectMap stockViewMap = instrumentSession.getViewMap("stockViewMap");

	   EntityMetadata emd = stockViewMap.getEntityMetadata();
	   TupleMetadata keyMD = emd.getKeyMetadata();
	   Tuple ibmKey = keyMD.createTuple();
	   ibmKey.setAttribute(0, key);

	   Tuple ibmValue = (Tuple) stockViewMap.get(ibmKey);
	   Float timeAveragedValuation = (Float)ibmValue.getAttribute(0);

 	   info("Stock Ticker Event :: Average price in the last 5 minutes for " +
                                               key + " = " + timeAveragedValuation);

	   instrumentSession.commit();

Once you have established the stock ticker symbol and current time-averaged stock price, you can go ahead and recalculate the aggregate value of all portfolios that contain a holding of this stock. First, retrieve a list of the portfolios that hold the stock represented by the event that is currently being processed (Listing 10).

Listing 10. ObjectGridEventListener: Retrieval of portfolios containing current stock
/* ------------------------------------------------------------- */
/* Retrieve List of Portfolios that hold this stock              */
/* ------------------------------------------------------------- */
ObjectMap portfoliosMap = portfolioSession.getMap("Portfolios");

StockOwners stockOwners = (StockOwners) holdingsMap.get(key);

ArrayList portfolioIDs = stockOwners.getCustomerIDs();

Iterator portfolioIterator = portfolioIDs.iterator();

Then, iterate through each stock held in the returned portfolio list and recalculate the aggregate portfolio valuation using the most recent time-averaged stock and forex prices (Listing 11).

Listing 11. ObjectGridEventListener -- Recalculation of aggregate portfolio valuations
while (portfolioIterator.hasNext())
{
portfolioSession.begin();
	String portfolioID = (String)portfolioIterator.next();	
	Portfolio portfolio = (Portfolio)portfoliosMap.get(portfolioID);
	ArrayList stocksList = portfolio.getStocks();
	ArrayList updatedStocksList = new ArrayList();
	Iterator stocksIterator = stocksList.iterator();

	float newPortfolioValuation = 0.0f;
	while (stocksIterator.hasNext())
	{
		StockHolding stockHolding = (StockHolding) stocksIterator.next();
		ibmKey.setAttribute(0, stockHolding.getTickerSymbol());
		ibmValue = (Tuple) stockViewMap.get(ibmKey);
		timeAveragedValuation = (Float)ibmValue.getAttribute(0);
		info("Stock Ticker Event :: Average price in the last 5 minutes for " 
			+ stockHolding.getTickerSymbol() + " = " + timeAveragedValuation);

		boolean usStockFlag = isUSStock(stockHolding.getTickerSymbol());
		if (usStockFlag)
	{
	   newStockValuation = stockHolding.getNumUnits() *
             timeAveragedValuation.floatValue()/timeAveragedForexValuation;
		}
		else
		{			        			
		   newStockValuation = stockHolding.getNumUnits() * 
                                                timeAveragedValuation.floatValue();
		}
		newPortfolioValuation += newStockValuation;
	}
	portfolioSession.commit();

Finally, check whether the aggregate valuation of each portfolio has changed by more than 10% of its original value (Listing 12).

Listing 12. ObjectGridEventListener: Check whether portfolio valuation has changed by more than 10%
float relativeChange = newPortfolioValuation/portfolio.getInitialValuation();

/* ---------------------------------------------------------------- */
/* If relative portfolio valuation  is > 1.10 or < 0.90 raise Alert */
/* ---------------------------------------------------------------- */
if ((relativeChange > 1.10) || (relativeChange < 0.90))
{
info("!************************************************************");
	info("!*");
	info("!* Portfolio Valuation Alert");
	info("!* Portfolio ID = " + portfolioID );
	info("!* Initial Valuation = " + portfolio.getInitialValuation());
	info("!* New Valuation = " + newPortfolioValuation);
	info("!* Relative Valuation Change = " + relativeChange);
	info("!*");
	info("!* Alert Triggering Stock = " + key);
	info("!*");
	info("!************************************************************");
}

Event emission

To illustrate the event-driven characteristics of this stock portfolio stop loss capability, you must be able to simulate the emission of events. This capability is provided by a set of command line utility programs that are effectively WebSphere eXtreme Scale client programs. Because these programs are all coded in an essentially consistent manner, the essential aspects of only one of these programs, StockEventStreamEmitter, is shown in Listing 13.

Listing 13. Event emission
ObjectMap stockMap = getObjectMap("stockStreamMap");

for (int i = 0; i < numEvents; i++)
{
int stockIndex = random(stockList.length);
	int priceIndex = random(stockPrices[stockIndex].length);
	float stockPrice = stockPrices[stockIndex][priceIndex];

	if (stockIndex < 10)
	{
		quoteCurrency = StockQuote.QuoteCurrency.USDOLLAR;
	}
	else
	{
		quoteCurrency = StockQuote.QuoteCurrency.GBPOUND;
	}

	StockQuote stockQuote = new StockQuote(stockList[stockIndex],
		stockPrice, quoteCurrency );

	stockMap.update(stockQuote.getTickerSymbol(), stockQuote);

	Thread.sleep(sleepTime);
}

For the purpose of this example, this code iteratively instantiates a StockQuote object using a ticker symbol and corresponding price, selected pseudo-randomly from hardcoded sets of possible values. These StockQuote Objects are then written to the ObjectMap defined as the basis of your stock event stream.


Scenario execution

Be aware that the execution of this event stream processing scenario is simplified through the use of scripts that encapsulate the invocation of a number of executables. The scripts that were used during the testing of this scenario are provided in the included download file, and can be modified for running your own sample scenario.

Start catalog server

WebSphere eXtreme Scale makes use of a catalog server to provide essential services to the distributed elements of the runtime topology. Start a catalog server instance from the command line by executing a command resembling the following from within the ObjectGrid bin directory:

C:\IBM\ObjectGrid\ObjectGrid\bin>startCatalog.bat cs1 hostname 6000

Notice that this command starts a catalog server instance named cs1 listening on port 6000. Replace hostname with the name of the machine on which you are running this scenario. Look for a message resembling the following as confirmation that the catalog server has initialized successfully:

[22/05/08 11:30:18:328 BST]   fc00fc ServerImpl    I CWOBJ1001I:
ObjectGrid Server cs1
is ready to process requests.

Start container server

Cached data resides within the JVM heap of one or more container servers. The containier server in this scenario also provides the runtime container within which both the SPT engine and your ObjectGridEventListener will execute. As such, you need to ensure that your executable code artifacts are available on the container server classpath.

These files are packaged within the download file that accompanies this article:

  • EventStreamProcessing.jar
  • ObjectGrid.xml
  • ObjectGridDeployment.xml

To execute the scenario, it is necessary to add EventStreamProcessing.jar to the classpath of the WebSphere eXtreme Scale ContainerServer. Copy the ObjectGrid.xml and ObjectGridDeployment.xml configuration files, described above, to an appropriate location in your file system and specify them on the parameters within your container server startup script. In our test environment, the container server startup script in Listing 14 was used.

Listing 14. Container server startup script
call "%~dp0setupCmdLine.bat"

"C:\IBM\ObjectGrid\java\jre/bin/java" "-Xmx512m" 
"-classpath" "C:\IBM\ObjectGrid\java\lib\tools.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\objectgrid.jar;
C:\IBM\ObjectGrid\ObjectGrid\session\lib\sessionobjectgrid.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\cglib.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\ogstreamquery.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\castor.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\commons-io.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\mx4j.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\mx4j-remote.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\mx4j-tools.jar;
C:\IBM\ObjectGrid\ObjectGrid\properties;
c:\IBM\ObjectGrid\EventProcessing\EventStreamProcessing.jar" 

"com.ibm.ws.objectgrid.InitializationService" %1 
-objectgridFile c:\IBM\ObjectGrid\EventProcessing\ObjectGrid.xml 
-deploymentPolicyFile c:\IBM\ObjectGrid\EventProcessing\ObjectGridDeployment.xml 
-catalogServiceEndpoints localhost:6000

An arbitrary container server name is specified upon the invocation of this script; for example, if the script resides within c:\IBM\ObjectGrid\ObjectGrid\bin and a container server name of c0 is required:

c:\IBM\ObjectGrid\ObjectGrid\bin startEventProcessingContainer.bat c0

Upon successfully starting the container server, you should see output messages that confirm the availability of our two configured ObjectGrid instances, as shown in Listing 15.

Listing 15. Successful container server startup
[22/05/08 11:31:47:671 BST]   fc00fc ServerImpl    I CWOBJ1001I: ObjectGrid Server c0 is
 ready to process requests.
[22/05/08 11:31:48:312 BST] 4b8e4b8e ReplicatedPar I CWOBJ1511I: CustomerPortfolioGrid:I
BM_SYSTEM_ENTITYMANAGER_MAPSET:0 (primary) is open for business.
[22/05/08 11:31:48:343 BST]  4d204d2 ReplicatedPar I CWOBJ1511I: CustomerPortfolioGrid:m
apSet1:0 (primary) is open for business.
[22/05/08 11:31:48:390 BST] 1d781d78 PeerManagerIm I CWOBJ8601I: PeerManager found peers
 of size 1
[22/05/08 11:31:48:390 BST] 1d781d78 ServerAgent   I CWOBJ7206I: New leader is (9.145.21
.53:3912). Old leader was (<null>).
[22/05/08 11:31:48:390 BST] 1d781d78 ServerAgent   I CWOBJ7203I: Leader changed.  New le
ader (9.145.21.53:3912) is elected in core group (DefaultZoneCG0) and re
ported to catalog server.
[22/05/08 11:31:48:515 BST]  4d204d2 ReplicatedPar I CWOBJ1511I: StocksGrid:mapSet1:0 (p
rimary) is open for business.
[22/05/08 11:31:48:578 BST] 4b8e4b8e ReplicatedPar I CWOBJ1511I: StocksGrid:IBM_SYSTEM_E
NTITYMANAGER_MAPSET:0 (primary) is open for business.

Initialize scenario

Once the container server has successfully started, it is necessary to prime the scenario, which involves the insertion of state data into the CustomerPortfolioGrid ObjectGrid instance, defining portfolio holdings. This exercise also writes initial stock and foreign exchange ticker events into the StocksGrid ObjectGrid instance. A command line program has been provided for this purpose. This program can be run from the command line using the following command, provided that the current directory contains EventStreamProcessing.jar, and Java 5 is specified on the PATH variable.

Listing 16. Write initial stock and foreign exchange ticker events
java "-classpath" "c:\IBM\ObjectGrid\ObjectGrid\lib\castor.jar;
c:\IBM\ObjectGrid\ObjectGrid\lib\commons-io.jar;
c:\IBM\ObjectGrid\ObjectGrid\lib\objectgrid.jar;
c:\IBM\ObjectGrid\ObjectGrid\lib\ogstreamquery.jar;
c:\IBM\ObjectGrid\java\lib\tools.jar;EventStreamProcessing.jar"
wxs.streamquery.example.ScenarioInit

Event stream emission

After checking that ScenarioInit ran successfully, you can commence to emit stock and foreign exchange ticker events. Once again, sample utility programs are provided in the download file for this purpose:

  • wxs.streamquery.emitter.StockEventStreamEmitter
  • wxs.streamquery.emitter.ForexEventStreamEmitter

Example scripts are provided to encapsulate the command string required to execute each of these utility programs. Be aware that each utility requires two parameters: the first specifies the number of events to emit, and the second specifies a time delay in milliseconds between consecutive events.

For example, to emit 500 stock ticker events at intervals of 500 milliseconds, enter:

C:\IBM\ObjectGrid\EventProcessing>emitStockTickerEvents.bat 100 500

To emit 500 forex ticker events at intervals of 500 milliseconds, enter:

C:\IBM\ObjectGrid\EventProcessing>emitForexTickerEvents.bat 100 500

To facilitate the demonstration of this scenario, a utility program is provided that combines a “tail” function that works on Windows® platforms and has a highlighting capability. Configuration of the program to display the output written to the container server SystemOut.log file, and to display stock and foreign exchange event output in contrasting colours, results in steady state output that resembles the following:

Figure 5. Output of time-averaged stock and forex values
Figure 5. Output of time-averaged stock and forex values

To illustrate the portfolio stop loss detection capability, you must emit an event that will cause the calculated aggregate portfolio values to stray outside the allowable range. Once again, a sample program and script are provided for this purpose.

Open a command window and execute the emitSingleStockTickerevent.bat script. Notice that this command takes two parameters: the ticker symbol of the stock, and the stock value:

C:\IBM\ObjectGrid\EventProcessing>emitSingleStockTickerEvent.bat IBM 10.0

In our development environment, this was sufficient to trigger a stock portfolio valuation alert that resulted in the following output in our log display, highlighting utility program. (It might be necessary for you to inject several low value stock ticker events to drive the averaged value low enough to trigger an alert.)

Figure 6. Output of aggregate portfolio valuation alert
Figure 6. Output of aggregate portfolio valuation alert

Download

DescriptionNameSize
Code sampleEventStream_Attachment_23072008.zip52 KB

Resources

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into WebSphere on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=WebSphere
ArticleID=334408
ArticleTitle=Event stream processing with WebSphere eXtreme Scale
publish-date=09032008