IBM®
跳转到主要内容
    中国 [选择]    使用条款
 
 
Select a scope: Search for:    
    首页    产品    服务与解决方案     支持与下载    个性化服务    
跳转到主要内容

developerWorks 中国  >  WebSphere  >

使用 WebSphere eXtreme Scale 处理事件流

developerWorks
文档选项

未显示需要 JavaScript 的文档选项

样例代码

英文原文

英文原文


级别: 中级

Dr. Alan Hopkins, 高级 IT 专家, IBM

2008 年 11 月 24 日

Journal icon IBM® WebSphere® eXtreme Scale 公开了一组丰富的 API,允许对驻留在分布式弹性高性能缓存中的数据进行访问。这些 API 支持各种应用程序编程模式。其中一个模式支持将重复缓存更新作为事件的时间序列处理。所编写的用于监视这些时间序列的应用程序能够将较低级别的事件进行相关和聚合,以帮助实时了解对业务具有重大意义的情况。本文描述一个基于 WebSphere eXtreme Scale 的简单场景,演示了异类事件流的实时处理。

来自 IBM WebSphere Developer Technical Journal

引言

现代企业体系结构的一个常见特征是其中包含了来自不同源的组件,这些组件基于不一致的基础技术,分别在不同的时间实现。让人毫不惊讶的是,所得到的基础设施经常过于复杂,而无法在 IT 级完全理解,更不用说在较高的业务抽象级别进行理解了。

在这些异类系统中活动的出现方式代表了今天现代企业内部(或更广范围内)的情况。由于采用完全独立的执行线程,这些活动将频繁地以彼此独立的方式出现。总的说来,这些异类事件代表经常没有得到重视的对业务威胁和机会的看法来源。

乍看之下,事件这个术语可能有些让人摸不着头脑。此术语既用来描述出现的活动,也用来描述计算机系统中出现的活动的表示形式。讨论事件处理时,通常允许该术语存在多种含义。希望使用此术语的上下文足以帮助避免出现混淆的情况。

将计算机内重要活动外部化的一个常见方法是通过发出描述活动的事件。事件的物理表示形式此时并不重要。重要的是,可以从综合考虑多个子系统的角度对在这些异类子系统中持续出现的活动进行分析。这样一组异类事件有时候被称为事件云

IBM WebSphere eXtreme Scale 提供了用于捕获和执行事件云的实时分析所需的功能。本文将给出一个 Web 应用程序的简单股票投资组合止损功能的实现。此实现可以利用 WebSphere eXtreme Scale 的一些关键功能,以促进异类事件流的实时处理。





回页首


场景:股票投资组合止损

此示例场景提供了简单的股票投资组合止损功能。在此场景中,客户股票投资组合包含任意数量的股票,可以使用美元或英镑进行交易。投资组合的估价以英镑表示,并会在每次传入股票报价机事件导致给定投资组合股票持有情况发生变化时都会以动态方式重新计算。


图 1. 股票投资组合止损场景
图 1. 股票投资组合止损场景

如果某个交易日投资组合聚合估价变化超过 10%,则将发出警报。在此示例中,警报实现为写入到系统日志的信息消息。(在更为实际的实现中,此警报可能会导致在业务监视仪表盘上显示警告,或启动自动交易来卖出所监视的部分或全部股票投资组合。)





回页首


WebSphere eXtreme Scale 基本信息

WebSphere eXtreme Scale 提供了运行时平台,可以在其中部署具有极高的性能、可伸缩性和可用性需求的应用程序。此技术提供的所有功能的全面讨论不在本文的讨论范围之内,但下面将对此解决方案中使用的一些功能进行简单的介绍。此信息提供上下文背景,用于帮助您理解示例场景的内容。(有关所有 WebSphere eXtreme Scale 功能的完整描述,请参见参考资料。)

  • 内存内缓存

    WebSphere eXtreme Scale 提供的集中性功能是内存内缓存,此缓存供驻留在各种异构计算机上的大量操作系统进程使用。此分布式缓存可以通过配置来提供企业级的服务质量,如性能、可伸缩性和弹性等。从管理的角度而言,这个内存内缓存的高级容器是 ObjectGrid。在 ObjectGrid 实例中,数据封装在一个或多个映射内。应用程序可以采用数据对象的形式(使用唯一的键进行标识)在这些映射中读取/写入数据。数据完整性通过 WebSphere eXtreme Scale 固有的事务功能予以保证。

    在我们的示例中,为了响应传入股票/外汇交易事件流中的离散事件,将使用 ObjectGrid 事件侦听器插件。因为这种类型的插件配置为在一个 ObjectGrid 实例范围内有效,因此将在我们的场景中使用两个配置的 ObjectGrid 实例。这样可以实现相对于传入事件的隔离。由于驻留在此缓存中的数据完全保存在内存中,因此使用 WebSphere eXtreme Scale 的一个主要好处是对缓存数据执行操作的速度。

  • 性能模型

    WebSphere eXtreme Scale 公开了一组丰富的 API,对通过大量应用程序编程模型访问缓存数据提供了全面支持。在此场景中,我们将使用其中的两个模式:

    • 在第一实例中,将使用简单映射 API 将条目写入到缓存中。WebSphere eXtreme Scale Map API 与 Java™ Collections Map API 类似,都可以将使用唯一键标识的条目写入映射对象中。
    • 此外,还将使用 WebSphere eXtreme Scale Stream Query API 来处理代表持续变化的股票和外币汇率的更新。这就允许将 WebSphere eXtreme Scale 映射的更新作为时间事件流处理。此功能是作为对核心映射功能的扩展提供的。对事件流处理的支持引入了流映射和视图映射的概念。流映射用于捕获一个或多个传入的原始事件流。从用于填充视图映射的原始数据流对数据元素进行提取和聚合的准确性通过使用流查询语言 (Stream Query Language) 表现出来,这是一种类似于 SQL 的语言,包含特定的扩展,用于帮助对时间事件流进行显式处理。这个原始事件流的处理由流处理技术(Stream Processing Technology,SPT)引擎进行处理。SPT 引擎输出写入到对应的流视图映射中。

    图 2 给出了通过这些 WebSphere eXtreme Scale 组件的处理流的图形表示形式。



    图 2. WebSphere eXtreme Scale 的事件流处理
    图 2. WebSphere eXtreme Scale 的事件流处理

  • ObjectGrid 事件侦听器

    WebSphere eXtreme Scale 的体系结构设计提供了可插入扩展点,可通过这些扩展点包含用户代码。ObjectGrid 事件侦听器扩展点允许包含出现重要 ObjectGrid 生命周期事件时调用的代码。示例场景使用此功能引入了一些代码,每次针对这里使用的两个 ObjectGrid 之一提交事务时,都将执行此代码。这个可插入的扩展点提供了用于包含代码的机制,以在每次收到股票报价机事件时重新计算聚合投资组合估价。





回页首


实现

下面将说明此示例场景的基本元素。此场景的开发工作使用以下软件产品执行:

  • IBM WebSphere Integration Developer Version 6.1(Java 透视图)
  • IBM WebSphere eXtreme Scale Version 6.1
  • IBM Java Runtime Environment Version 1.5.0

尽管开发场景中的基础 Java 运行时环境是 J2SE™,但此应用程序在基于 IBM WebSphere Application Server 的环境中也应该能够同样正常运行。

WebSphere eXtreme Scale 配置

使用两个补充配置文件配置 WebSphere eXtreme Scale。第一个文件 objectGrid.xml(清单 1)中包含此场景中使用的两个 ObjectGrid 的定义:

  • StocksGrid 用于封装包含外汇交易和股票报价器事件的传入事件流。在此 ObjectGrid 内,可定义四个支持映射:

    • 传入原始股票和外汇交易事件流分别写入到 stockStreamMapforexStreamMap
    • 由 STP 引擎派生这两个初始映射的视图数据(具体受到对应的流和视图规范的影响)分别写入到 stockViewMapforexViewMap

    可以在流定义中找到与场景相关的数据对象属性的规范。对来自这些流的数据的基于时间的聚合通过视图定义提供。

    下面的配置数据中还要注意的一点是,StocksGrid ObjectGrid 实例上的 ObjectGridEventListener 插件的规范。

  • CustomerPortfolioGrid 用于包含各个客户投资组合的详细信息。此 ObjectGrid 实例包含两个支持映射实例:通过 StockOwners 可确定任何给定股票的所有者,而通过 Portfolios 可检索所有监视的客户投资组合的详细信息。


清单 1. 配置文件 objectGrid.xml
				
<objectGrids>
  <objectGrid name="StocksGrid">
    <bean id="ObjectGridEventListener" className="wxs.plugins.ObjectGridEventListener"/>
    <backingMap name="stockStreamMap" streamRef="stockStream"/>
    <backingMap name="forexStreamMap" streamRef="forexStream"/>            
    <backingMap name="stockViewMap" viewRef="stockView"/>            
    <backingMap name="forexViewMap" viewRef="forexView"/>

    <streamQuerySet name="stockMonitoringSQS">
        <stream name="stockStream" 
     		valueClass="wxs.streamquery.example.StockQuote" 
     		sql="create stream stockStream keyed by t (price DECIMAL (9,2), 
                  tickerSymbol VARCHAR(100));"
           	access="FIELD" >
        </stream>
        <stream name="forexStream" 
         	valueClass="wxs.streamquery.example.ForexQuote" 
          	sql="create stream forexStream keyed by t (rate DECIMAL (9,4), 
                    type VARCHAR(100) );"
               access="FIELD" >
        </stream>
        <view name="stockView"
          sql="CREATE VIEW stockView AS SELECT tickerSymbol, avg(price) as avgPrice FROM 
               (SELECT * FROM stockStream FETCH LATEST 5 MINUTES) group by tickerSymbol"
          access="FIELD">
        </view>
        <view name="forexView"                    
              sql="CREATE VIEW forexView AS SELECT type, avg(rate) as avgRate FROM 
                   SELECT * FROM forexStream FETCH LATEST 5 MINUTES) group by type"
              access="FIELD">
        </view>                 
    </streamQuerySet>
</objectGrid>
        
<objectGrid name="CustomerPortfolioGrid">
     <backingMap name="StockOwners" readOnly="false" lockStrategy="PESSIMISTIC"/>
     <backingMap name="Portfolios" readOnly="false" />      	
   <ObjectGrid>      	
</objectGrids>

第二个配置文件 objectGridDeployment.xml 定义缓存拓扑的特征。此拓扑配置的基本元素如清单 2 中所示,其中显示了每个 ObjectGrid 实例都是由单个分区组成的。


清单 2. 配置文件 objectGridDeployment.xml
				
<objectgridDeployment objectgridName="StocksGrid">
     <mapSet name="mapSet1" numberOfPartitions="1"
         minSyncReplicas="0" maxSyncReplicas="0"
         maxAsyncReplicas="0" numInitialContainers="1">
         <map ref="stockStreamMap"/>   
   	  <map ref="forexStreamMap"/>   
	  <map ref="forexViewMap"/>     
	  <map ref="stockViewMap"/>
     </mapSet>
</objectgridDeployment>

<objectgridDeployment objectgridName="CustomerPortfolioGrid">
     <mapSet name="mapSet1" numberOfPartitions="1"
         minSyncReplicas="0" maxSyncReplicas="0"
         maxAsyncReplicas="0" numInitialContainers="1">
         <map ref="StockOwners"/>   
         <map ref="Portfolios"/>
     </mapSet>
</objectgridDeployment>

缓存对象数据模型

定义两组 POJO,以用于封装缓存在 ObjectGrid 实例中的数据:

  • 第一组对象用于保存描述客户、股票投资组合和股票持有情况的持有状态数据,这些对象是在场景初始化阶段定义的,在场景的整个生命周期中保持不变(图 3)。

    图 3. 静态数据模型
    图 3. 静态数据模型

  • 第二组数据对象表示股票和货币汇率事件,这些事件组成了此场景使用的事件流(图 4)。

    图 4. 事件流数据模型
    图 4. 事件流数据模型

代码构件

此实现涉及到表 1 中列出的 Java 代码构件。


表 1. 代码构件
类名称描述
wxs.streamquery.example.StockOwners封装拥有给定股票的投资组合 ID 集的 POJO。
wxs.streamquery.example.Portfolio封装客户投资组合的 POJO。
wxs.streamquery.example.StockQuote封装股票报价器事件的 POJO。
wxs.streamquery.example.ForexQuote封装汇率报价器事件的 POJO。
wxs.streamquery.example.StockHolding封装持有单支股票的投资组合的 POJO。
Wxs.streamquery.example.StatusFlag用于表示正在初始化的场景的 POJO。
wxs.streamquery.example.ScenarioInit启动止损场景的命令行 Java 程序。
wxs.streamquery.emitter.ForexEventStreamEmitter用于发出汇率报价器事件流的命令行 Java 程序。
wxs.streamquery.emitter.StockEventStreamEmitter用于发出股票报价器事件流的命令行 Java 程序。
wxs.streamquery.emitter.SingleStockEventEmitter用于发出单个股票报价器事件的命令行 Java 程序。
wxs.plugins.ObjectGridEventListener给定 ObjectGrid 实例的状态每次发生变化都会调用的侦听器程序。在此场景中,将使用此程序来对提交到 StocksGrid ObjectGrid 实例的更改作出反应。
wxs.utility.CustomerPortfolioGridHelper管理 CustomerPortfolioGrid ObjectGrid 独立实例的句柄获得情况的实用工具程序。
wxs.utility.StocksGridHelper管理 StocksGrid ObjectGrid 独立实例的句柄获得情况的实用工具程序。

接下来我们将对其中一些构件进行进一步的讨论。因为这里的目的是形成这些代码构件最重要方面的概貌,因此不会在这里提供组成此场景的所有 Java 程序的详细描述。不过,上表中所示的所有代码构件都可以在随本文提供的下载文件中找到。





回页首


场景初始化

此场景的初始化封装在 ScenarioInit 中,这是一个命令行程序,负责创建 StockOwners 和 Portfolio POJO 实例并将其写入 CustomerPortfolioGrid ObjectGrid 实例。稍后在 ObjectGridEventListener 运行时,会在止损情况检测期间使用此状态信息。场景初始化还负责为场景中使用的每个股票写入初始估价,并要将英镑到美元的初始货币汇率写入 StocksGrid ObjectGrid 实例。

状态数据定义

如果注意一下 ScenarioInit,会发现每个股票、投资组合和汇率的详细信息都已在类源代码启动时进行了硬编码。


清单 3. 股票投资组合和初始股票价格的定义
				
static String[] stockList = {"AIG",  "AXP",  "ATT",  "INTC",  "MOT",  "IBM", "CSCO",
                             "JAVA", "MSFT", "ORCL", "HBOS", "HSBA",  "BAY", "BGY",
                             "BLND", "GSK", "ITV", "LAND",  "III", "BT"};

static float[] stockPrices = {49.12f, 52.40f, 24.95f, 23.56f,  10.07f, 123.29f, 26.92f,
                              13.14f, 29.43f, 21.55f, 4.71f, 8.82f, 2.09f, 7.14f,
                              8.04f, 11.53f, 0.61f, 14.67f, 8.97f, 2.33f}; 

static int[][] portfolioVolumes = {		
     	{1000, 500, 2000, 850, 4000, 2000, 7000, 2500, 5000, 9000, 2000},
{1000, 2500, 4000, 9000, 500, 900, 600, 2000},
	{8000, 900, 2500, 1000, 2000, 3000},
	{3000, 1000, 2500, 5500, 500, 2000, 7500, 1000},
	{3000, 5500, 4000, 500, 3000, 7000, 9000},
	{3500, 2500, 9000, 6500, 3000, 1000, 2500, 1000},
	{1000, 7000, 8000, 3000, 500, 3500, 5500},
	{9000, 3000, 5500, 1000, 8000, 4000, 3000, 2000, 1500},
	{2500, 500, 4000, 3000, 9000, 8000, 7000, 6000},
	{5500, 1000, 2500, 500, 3000, 8000, 500, 3000}};

static String[][] portfolioStocks = {	
{"AIG", "AXP", "ATT", "INTC", "MOT", "IBM", "CSCO", "JAVA", "MSFT", "HBOS"},
{"IBM", "CSCO", "JAVA", "MSFT", "ORCL", "BAY", "BGY", "BLND"},
{"IBM", "CSCO", "JAVA", "GSK", "ITV", "LAND"},
{"ATT", "INTC", "MOT", "IBM", "CSCO","III", "BT", "HBOS"},
{"AIG", "ATT", "INTC", "MOT", "JAVA", "ORCL","HSBA"},
{"ATT", "INTC", "MOT", "IBM", "CSCO",  "ORCL", "BAY", "BGY"},
{"AIG", "AXP", "INTC", "JAVA", "ORCL","BLND", "GSK"},
{"AIG", "AXP", "INTC", "IBM", "CSCO", "JAVA", "ORCL","ITV","LAND"},
{"AIG", "ATT", "MOT", "CSCO", "MSFT", "ORCL","III","BT"},
{"AXP", "ATT", "INTC", "IBM", "MSFT", "ORCL","GSK", "ITV"}};
   	
float initialDollarToPoundsForexRate = 1.9876f;

股票所有权状态数据创建

清单 4 中的代码片段显示了可以如何将股票所有权数据写入到 CustomerPortfolioGrid ObjectGrid 实例中的 StockOwners 映射。对于场景中引用的每支股票(在清单 3 中的 stockList 数组中定义),都要创建一个 StockOwners 对象实例,其中带有设置为 stockList[] 中的股票报价器代码的 key 值。然后可以构造 customerIDs 数组,对于包含 StockOwners 对象实例表示的股票的投资组合,其中都包含一个对应的条目。在将对象写入 StockOwners 映射前,此数组设置为 customerIDs 属性的值。

运行了清单 4 中的代码后,对于 stockList[] 数组中定义的每支股票,StockOwners 映射都将包含一个对应的 StockOwners 对象实例。


清单 4. 使用静态数据填充 StockOwners 映射
				
portfolioGrid = CustomerPortfolioGridHelper.getOG("CustomerPortfolioGrid");
stocksGrid = StocksGridHelper.getOG("StocksGrid");
    	    		
/* -------------------------------------------------------- */
/* Get ObjectGrid Session                                   */
/* -------------------------------------------------------- */
Session portfolioSession = portfolioGrid.getSession();

/* -------------------------------------------------------- */
/* Write Stock Ownership StockOwners Map                    */
/* -------------------------------------------------------- */
ObjectMap ownersMap = portfolioSession.getMap("StockOwners");	 
        
portfolioSession.begin();        
        
// Create array of StockOwners objects
for (int i = 0; i < stockList.length; i++)
{
	ArrayList customerIDs = new ArrayList();
	StockOwners stockOwners = new StockOwners(stockList[i]);
	stockOwners.setTickerSymbol(stockList[i]);
        	
	for (int j = 0; j < portfolioStocks.length; j++)
	{
		for (int k = 0; k < ScenarioInit.portfolioStocks[j].length; k++)
    		{
    			if (ScenarioInit.portfolioStocks[j][k].equals(stockList[i]))
    			{
    				customerIDs.add(String.valueOf(j + 1));
    			}
    		}
    	}
    	stockOwners.setCustomerIDs(customerIDs);
    	ownersMap.put(stockOwners.getTickerSymbol(), stockOwners);   
}
portfolioSession.commit();

获得 ObjectGrid 句柄

在清单 4 中,获得所配置的 ObjectGridInstances CustomerPortfolioGrid 和 StocksGrid 的句柄的过程封装在两个类似的 Helper 类中。清单 5 显示了其中的一个 Helper 类的基本内容。


清单 5. 使用静态数据填充 Portfolios 映射
				
synchronized static public ObjectGrid getOG(String objectGrid)
{
   if (og == null)
   {			
      try
      {			
	 URL url = Thread.currentThread().getContextClassLoader().
		getResource("SQ_ObjectGrid.xml");
								
	 ClientClusterContext ccc = bjectGridManagerFactory.getObjectGridManager().
		connect("localhost:6000", null,url);
								
	 ObjectGridManager ogm = ObjectGridManagerFactory.getObjectGridManager();
	 
         og = ogm.getObjectGrid(ccc,objectGrid);
			
      }
      catch (Exception e)
      {
	  e.printStackTrace();
      }			
   }
   else
   {
      System.out.println("StocksGridHelper.getOG() - OG already initialised");
   }		
   return og;
}

客户投资组合状态数据创建

现在必须将定义客户股票投资组合的状态数据写入到 CustomerPortfolioGrid ObjectGrid 实例内的 Portfolios 映射中。同样,此数据也会在监视客户投资组合聚合值的变化时使用,以便提供止损功能。用于将投资组合状态数据写入 WebSphere eXtreme Scale 的代码片段如清单 6 中所示。会为每个投资组合计算初始聚合投资组合估值,并将此值复制到每个 portfolio 对象实例的 initialValuation 属性中。

清单 6 中的代码成功运行后,每个已定义的客户股票投资组合都有一个对应的 portfolio 对象实例写入到了 Portfolios ObjectMap 中。


清单 6. 使用静态数据填充 Portfolios 映射
				
ObjectMap portfoliosMap = portfolioSession.getMap("Portfolios");
        
for (int j = 0; j < ScenarioInit.portfolioStocks.length; j++)
{
 	float portfolioValuation = 0.0f;
 	Portfolio portfolio = new Portfolio(String.valueOf(j + 1));
 	ArrayList<StockHolding> stockHoldingList = new ArrayList<StockHolding>();	
    		
 	portfolioSession.begin();     
    		
 	for (int k = 0; k < ScenarioInit.portfolioStocks[j].length; k++)
 	{
 	   int numUnits = ScenarioInit.portfolioVolumes[j][k];
 	   StockHolding stockHolding = 
           new StockHolding(ScenarioInit.portfolioStocks[j][k]);
    	   stockHolding.setNumUnits(numUnits);
    				
    	   if (isUSStock(ScenarioInit.portfolioStocks[j][k]))
    	   {
    	      stockValuation = 
                   numUnits * initialStockPrice(portfolioStocks[j][k])/
                                                  initialDollarToPoundsForexRate;
    	   }
    	   else
    	   {
    	      stockValuation = numUnits * initialStockPrice(portfolioStocks[j][k]);
    	   }
    	   stockHolding.setLatestValuation(stockValuation);
    	   stockHoldingList.add(stockHolding);
    	   portfolioValuation += stockValuation;    			
    	}
    	portfolio.setStocks(stockHoldingList);
    	portfolio.setInitialValuation(portfolioValuation);    			
    	portfoliosMap.insert(portfolio.getCustomerID(), portfolio);  
}
portfolioSession.commit();

将初始事件写入事件流映射

接下来,必须将表示场景中每支股票的开盘价的对象插入到 stockStreamMap 中,将英镑到美元的汇率插入到 forexStreamMap 中,从而准备好股票和货币交易事件流。

插入这些事件所需代码的基本元素如清单 7 中所示。


清单 7. 场景初始化:插入初始事件
				
Session stockStreamSession = stocksGrid.getSession();

ObjectMap stockStreamMap = stockStreamSession.getMap("stockStreamMap");
ObjectMap stockViewMap = stockStreamSession.getViewMap("stockViewMap");
    		
stockStreamSession.begin();
        
StockQuote.QuoteCurrency currency = null;
    		
for (int i = 0; i < stockList.length; i++)
{
 	if (i < 10)
 	{
 		currency = StockQuote.QuoteCurrency.USDOLLAR;
 	}
 	else
 	{
 		currency = StockQuote.QuoteCurrency.GBPOUND;
 	}

 	stockStreamMap.insert(stockList[i], new StockQuote(stockList[i], 
      stockPrices[i], 
      currency));  
}

    		
// Forex
forexStreamMap.insert("USDOLLAR_TO_GBPOUND", new ForexQuote("USDOLLAR_TO_GBPOUND",
        initialDollarToPoundsForexRate));

stockStreamSession.commit();

ObjectGridEventListener

正如前面所述,WebSphere eXtreme Scale 的设计提供了一系列扩展点,可用于对此技术的缺省功能进行扩展。可以在整个 WebSphere eXtreme Scale 应用程序生命周期中指定代码模块在特定的定义点运行。

提供了 ObjectGridEventListener 插件模块,可用于将在事务生命周期管理期间的策略点调用的代码包含进来。通过利用这个功能,可以引入在每次事务提交到 StocksGrid ObjectGrid 实例(所有股票和外汇交易报价器事件都写入到其中)时运行的功能。因此,ObjectGridEventListener 的实现是这里给出的事件驱动的场景的核心。此插件负责处理表示派生自股票和外汇交易报价器视图更新的事件。

聚合投资组合价值重新计算工作是在对 stockView 视图更新之后执行的,此视图派生自 stockStream 事件报价器流。这些聚合值使用按时间计算的平均股票和外汇交易值(分别派生自各自的事件流)进行计算。

endTransaction() 方法最重要的内容如清单 18 中所示。(为了简单起见,对这里所示的代码进行了浓缩,仅意在说明代码如何工作的本质。有关完整的代码清单,请参见所附的下载文件。)封装在给定事务范围内的更改的 collection 对象作为参数传递给此方法调用。代码的第一部分重点处理如何确定按时间计算的最近平均外汇交易值。为了检索此值,将使用相互关联的实体 API 对象来最终获得当前的平均值。


清单 8. ObjectGridEventListener:检索按时间计算的平均外汇交易值
				
public void transactionEnd(String txid, boolean isWriteThruEnabled,
   boolean committed, Collection changes) 
{

 Iterator logSequenceIterator = changes.iterator();

 while (logSequenceIterator.hasNext())
 {
    LogSequence logSequence = (LogSequence)logSequenceIterator.next();
    String mapName = logSequence.getMapName();
    Iterator forexChangeIterator = logSequence.getAllChanges();

    if ((mapName.equals("forexViewMap") && logSequence.isDirty()))
    {

	while (changeIterator.hasNext())
	{
	  try
	  {
	   LogElement logElement = (LogElement)changeIterator.next();
	   Tuple forexKeyTuple = (Tuple)(logElement.getCacheEntry().getKey());
	   String key = (String)forexKeyTuple.getAttribute(0);
	   ogSession = instrumentGrid.getSession();
	   ObjectMap forexViewMap = ogSession.getViewMap("forexViewMap");
	   EntityMetadata emd = forexViewMap.getEntityMetadata();
	   TupleMetadata keyMD = emd.getKeyMetadata();
	   Tuple forexTuple = keyMD.createTuple();
	   forexTuple.setAttribute(0, "USDOLLAR_TO_GBPOUND"); //Could use key value
	   Tuple forexValueTuple = (Tuple)forexViewMap.get(forexTuple);
	   timeAveragedForexValuation = (Float)forexValueTuple.getAttribute(0);
	}
	catch ( Exception e)
      {......}
    }
  }
 }
}

然后需要处理刚刚提交的事务是由于传入股票报价器事件引发的情况,即导致 transactionEnd() 方法调用的事务提交针对的是 stockView 映射。对于这种情况,要首先建立事件的键(设置为股票报价器代码),然后检索此报价器代码的当前平均价格估价。


清单 9. ObjectGridEventListener:检索按时间计算的平均股票值
				
else if (mapName.equals("stockViewMap") && logSequence.isDirty())
{
   while (changeIterator.hasNext())
   {
      try
      {
	   LogElement logElement = (LogElement)changeIterator.next();
	   LogElement.Type type = logElement.getType();	
	   Tuple keyTuple = (Tuple)logElement.getCacheEntry().getKey();
	   String key = (String)keyTuple.getAttribute(0);

	   Session instrumentSession = instrumentGrid.getSession();
	   instrumentSession.beginNoWriteThrough();
	   ObjectMap stockViewMap = instrumentSession.getViewMap("stockViewMap");

	   EntityMetadata emd = stockViewMap.getEntityMetadata();
	   TupleMetadata keyMD = emd.getKeyMetadata();
	   Tuple ibmKey = keyMD.createTuple();
	   ibmKey.setAttribute(0, key);

	   Tuple ibmValue = (Tuple) stockViewMap.get(ibmKey);
	   Float timeAveragedValuation = (Float)ibmValue.getAttribute(0);

 	   info("Stock Ticker Event :: Average price in the last 5 minutes for " +
                                               key + " = " + timeAveragedValuation);

	   instrumentSession.commit();

建立了股票报价器代码和当前平均股票价后,可以开始重新计算持有此股票的所有投资组合的聚合值。首先,检索持有当前处理的事件所涉及的股票的投资组合列表(清单 10)。


清单 10. ObjectGridEventListener:检索包含当前股票的投资组合
				
/* ------------------------------------------------------------- */
/* Retrieve List of Portfolios that hold this stock              */
/* ------------------------------------------------------------- */
ObjectMap portfoliosMap = portfolioSession.getMap("Portfolios");

StockOwners stockOwners = (StockOwners) holdingsMap.get(key);

ArrayList portfolioIDs = stockOwners.getCustomerIDs();

Iterator portfolioIterator = portfolioIDs.iterator();

然后,遍历返回的投资组合列表中持有的每支股票并使用最新的平均股票和汇率价格重新计算投资组合估价(清单 11)。


清单 11. ObjectGridEventListener——重新计算聚合投资组合估价
				
while (portfolioIterator.hasNext())
{
portfolioSession.begin();
	String portfolioID = (String)portfolioIterator.next();	
	Portfolio portfolio = (Portfolio)portfoliosMap.get(portfolioID);
	ArrayList stocksList = portfolio.getStocks();
	ArrayList updatedStocksList = new ArrayList();
	Iterator stocksIterator = stocksList.iterator();

	float newPortfolioValuation = 0.0f;
	while (stocksIterator.hasNext())
	{
		StockHolding stockHolding = (StockHolding) stocksIterator.next();
		ibmKey.setAttribute(0, stockHolding.getTickerSymbol());
		ibmValue = (Tuple) stockViewMap.get(ibmKey);
		timeAveragedValuation = (Float)ibmValue.getAttribute(0);
		info("Stock Ticker Event :: Average price in the last 5 minutes for " 
			+ stockHolding.getTickerSymbol() + " = " + timeAveragedValuation);

		boolean usStockFlag = isUSStock(stockHolding.getTickerSymbol());
		if (usStockFlag)
	{
	   newStockValuation = stockHolding.getNumUnits() *
             timeAveragedValuation.floatValue()/timeAveragedForexValuation;
		}
		else
		{			        			
		   newStockValuation = stockHolding.getNumUnits() * 
                                                timeAveragedValuation.floatValue();
		}
		newPortfolioValuation += newStockValuation;
	}
	portfolioSession.commit();

最后,检查每个投资组合的聚合估价相对于其原始值的变化是否超过 10%(清单 12)。


清单 12. ObjectGridEventListener:检查投资组合估值变化是否超过 10%
				
float relativeChange = newPortfolioValuation/portfolio.getInitialValuation();

/* ---------------------------------------------------------------- */
/* If relative portfolio valuation  is > 1.10 or < 0.90 raise Alert */
/* ---------------------------------------------------------------- */
if ((relativeChange > 1.10) || (relativeChange < 0.90))
{
info("!************************************************************");
	info("!*");
	info("!* Portfolio Valuation Alert");
	info("!* Portfolio ID = " + portfolioID );
	info("!* Initial Valuation = " + portfolio.getInitialValuation());
	info("!* New Valuation = " + newPortfolioValuation);
	info("!* Relative Valuation Change = " + relativeChange);
	info("!*");
	info("!* Alert Triggering Stock = " + key);
	info("!*");
	info("!************************************************************");
}





回页首


发出事件

为了演示此股票投资组合止损功能的事件驱动特征,必须能够模拟事件的发出。此功能由一组命令行实用工具程序提供,这些程序实际上是 WebSphere eXtreme Scale 客户端程序。因为所有这些程序都采用基本上一致的方式编码,所以仅在清单 13 中给出了这些程序中之一 StockEventStreamEmitter 的重要代码。


清单 13. 发出事件
				
ObjectMap stockMap = getObjectMap("stockStreamMap");

for (int i = 0; i < numEvents; i++)
{
int stockIndex = random(stockList.length);
	int priceIndex = random(stockPrices[stockIndex].length);
	float stockPrice = stockPrices[stockIndex][priceIndex];

	if (stockIndex < 10)
	{
		quoteCurrency = StockQuote.QuoteCurrency.USDOLLAR;
	}
	else
	{
		quoteCurrency = StockQuote.QuoteCurrency.GBPOUND;
	}

	StockQuote stockQuote = new StockQuote(stockList[stockIndex],
		stockPrice, quoteCurrency );

	stockMap.update(stockQuote.getTickerSymbol(), stockQuote);

	Thread.sleep(sleepTime);
}

对于本示例,此代码使用报价器和对应的价格(以伪随机方式从硬编码的可能值选择)以迭代方式实例化 StockQuote 对象。这些 StockQuote 对象会随后写入到定义为股票事件流基础的 ObjectMap。





回页首


场景执行

请注意,此事件流处理场景的执行通过使用封装一系列可执行文件调用的脚本进行了简化。此场景测试期间使用的脚本在随附的下载文件中提供,可以进行适当修改,以运行您自己的示例场景。

启动目录服务器

WebSphere eXtreme Scale 使用目录服务器来向运行时拓扑的分布式样式提供基本服务。从 ObjectGrid bin 目录中执行与以下所示类似的命令,从而从命令行启动目录服务器实例:

C:\IBM\ObjectGrid\ObjectGrid\bin>startCatalog.bat cs1 hostname 6000

请注意,此命令将启动名为 cs1 的目录服务器实例,侦听端口 6000。请使用运行此场景的计算机名称替换 hostname。查找与以下所示类似的消息,以确认目录服务器是否已成功启动:

[22/05/08 11:30:18:328 BST] fc00fc ServerImpl I CWOBJ1001I: ObjectGrid Server cs1
is ready to process requests.

启动容器服务器

缓存数据驻留在一个或多个容器服务器的 JVM 堆中。本场景中的容器服务器还提供了运行时容器,SPT 引擎和 ObjectGridEventListener 都将在其中运行。因此,您需要确保可执行代码构件在容器服务器类路径上可用。

这些文件打包在随本文提供的下载文件中:

  • EventStreamProcessing.jar
  • ObjectGrid.xml
  • ObjectGridDeployment.xml

要执行此场景,将有必要把 EventStreamProcessing.jar 添加到 WebSphere eXtreme Scale ContainerServer 的类路径中。将上面所述的 ObjectGrid.xml 和 ObjectGridDeployment.xml 配置文件复制到文件系统中相应的位置,并在容器服务器启动脚本中指定其上的参数。在我们的测试环境中,使用了清单 14 中所示的容器服务器启动脚本。


清单 14. 容器服务器启动脚本
				
call "%~dp0setupCmdLine.bat"

"C:\IBM\ObjectGrid\java\jre/bin/java" "-Xmx512m" 
"-classpath" "C:\IBM\ObjectGrid\java\lib\tools.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\objectgrid.jar;
C:\IBM\ObjectGrid\ObjectGrid\session\lib\sessionobjectgrid.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\cglib.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\ogstreamquery.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\castor.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\commons-io.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\mx4j.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\mx4j-remote.jar;
C:\IBM\ObjectGrid\ObjectGrid\lib\mx4j-tools.jar;
C:\IBM\ObjectGrid\ObjectGrid\properties;
c:\IBM\ObjectGrid\EventProcessing\EventStreamProcessing.jar" 

"com.ibm.ws.objectgrid.InitializationService" %1 
-objectgridFile c:\IBM\ObjectGrid\EventProcessing\ObjectGrid.xml 
-deploymentPolicyFile c:\IBM\ObjectGrid\EventProcessing\ObjectGridDeployment.xml 
-catalogServiceEndpoints localhost:6000

调用此脚本时指定任意容器服务器名称,例如,如果脚本驻留在 c:\IBM\ObjectGrid\ObjectGrid\bin 中,则需要使用容器服务器名称 c0:

c:\IBM\ObjectGrid\ObjectGrid\bin startEventProcessingContainer.bat c0

成功启动容器服务器后,应该看到确认所配置的两个 ObjectGrid 实例的可用性输出消息,如清单 15 中所示。


清单 15. 成功启动容器服务器
				
[22/05/08 11:31:47:671 BST]   fc00fc ServerImpl    I CWOBJ1001I: ObjectGrid Server c0 is
 ready to process requests.
[22/05/08 11:31:48:312 BST] 4b8e4b8e ReplicatedPar I CWOBJ1511I: CustomerPortfolioGrid:I
BM_SYSTEM_ENTITYMANAGER_MAPSET:0 (primary) is open for business.
[22/05/08 11:31:48:343 BST]  4d204d2 ReplicatedPar I CWOBJ1511I: CustomerPortfolioGrid:m
apSet1:0 (primary) is open for business.
[22/05/08 11:31:48:390 BST] 1d781d78 PeerManagerIm I CWOBJ8601I: PeerManager found peers
 of size 1
[22/05/08 11:31:48:390 BST] 1d781d78 ServerAgent   I CWOBJ7206I: New leader is (9.145.21
.53:3912). Old leader was (<null>).
[22/05/08 11:31:48:390 BST] 1d781d78 ServerAgent   I CWOBJ7203I: Leader changed.  New le
ader (9.145.21.53:3912) is elected in core group (DefaultZoneCG0) and re
ported to catalog server.
[22/05/08 11:31:48:515 BST]  4d204d2 ReplicatedPar I CWOBJ1511I: StocksGrid:mapSet1:0 (p
rimary) is open for business.
[22/05/08 11:31:48:578 BST] 4b8e4b8e ReplicatedPar I CWOBJ1511I: StocksGrid:IBM_SYSTEM_E
NTITYMANAGER_MAPSET:0 (primary) is open for business.

初始化场景

容器服务器成功启动后,就有必要对场景进行初始化,这其中涉及到将状态数据插入 CustomerPortfolioGrid ObjectGrid 实例,以定义投资组合股票持有情况。此工作还会将初始股票和外汇交易报价器事件写入到 StocksGrid ObjectGrid 实例中。提供了用于进行此工作的命令行程序。可以从命令行使用以下命令运行此程序,前提是当前目录包含 EventStreamProcessing.jar,且在 PATH 变量中指定了 Java 5。


清单 16. 写入初始股票和外汇交易报价器事件
				
java "-classpath" "c:\IBM\ObjectGrid\ObjectGrid\lib\castor.jar;
c:\IBM\ObjectGrid\ObjectGrid\lib\commons-io.jar;
c:\IBM\ObjectGrid\ObjectGrid\lib\objectgrid.jar;
c:\IBM\ObjectGrid\ObjectGrid\lib\ogstreamquery.jar;
c:\IBM\ObjectGrid\java\lib\tools.jar;EventStreamProcessing.jar"
wxs.streamquery.example.ScenarioInit

发出事件流

确保 ScenarioInit 成功运行后,就可以开始发出股票和外汇交易报价器事件。同样,我们在下载文件中提供了进行此工作的示例实用工具程序:

  • wxs.streamquery.emitter.StockEventStreamEmitter
  • wxs.streamquery.emitter.ForexEventStreamEmitter

提供了示例脚本,其中封装了执行其中每个实用工具程序所需的命令字符串。请注意每个实用工具均需要两个参数:第一个参数指定要发出的事件数量,第二个参数指定前后两个事件间的事件延迟(以毫秒为单位)。

例如,要以 500 毫秒为间隔发出 500 个股票报价器事件,则输入:

C:\IBM\ObjectGrid\EventProcessing>emitStockTickerEvents.bat 100 500

要以 500 毫秒为间隔发出 500 个外汇交易报价器事件,则输入:

C:\IBM\ObjectGrid\EventProcessing>emitForexTickerEvents.bat 100 500

为了帮助演示此场景,提供了一个实用工具程序来将 Windows® 平台上的“tail”函数与突出显示功能进行结合。该程序配置为显示写入到容器服务器 SystemOut.log 文件的输出,并以对比色显示股票和外汇交易输出,得到的稳定输出与以下所示类似:


图 5. 按时间计算的平均股票和外汇交易值输出
图 5. 按时间计算的平均股票和外汇交易值输出

为了演示投资组合止损检测功能,必须发出相应的事件,以让所计算的聚合投资组合值超出允许的范围。同样也提供了用于进行此工作的示例程序和脚本。

打开命令窗口并执行 emitSingleStockTickerevent.bat 脚本。请注意,此参数接受两个参数:股票的报价器代码和股票值:

C:\IBM\ObjectGrid\EventProcessing>emitSingleStockTickerEvent.bat IBM 10.0

在我们的开发环境中,这已足够触发投资组合估价警报,会在我们的日志显示(突出显示实用工具程序)中得到以下输出。(可能有必要注入几个值较低的股票报价器事件,才能让平均值低到足够触发警报的程度。)


图 6. 聚合投资组合估价警报的输出
图 6. 聚合投资组合估价警报的输出





回页首


下载

描述名字大小下载方法
Code sampleEventStream_Attachment_23072008.zip52 KBHTTP
关于下载方法的信息


参考资料



关于作者

Alan Hopkins 的照片

Alan 是 IBM Software Group Services for WebSphere 的高级 IT 专家,具有超过 16 年的中间件和 Internet 相关技术方面的经验。在他的职业生涯中,大部分时间都将工作重点放在事务中间件系统及其在电子商务中间件领域的应用上。最近,Alan 在专门从事业务流程集成的重要策略领域方面的研究。他尤其对基于开放标准的方法在业务流程集成领域的应用感兴趣,包括面向服务的体系结构的发展及其在基于企业服务总线的基础设施上的部署。目前,Alan 是 Business Integration Solution Services Team 的成员,该团队隶属位于英国的 IBM Hursley Park Laboratory 的 IBM Software Services for WebSphere。




对本文的评价










回页首


IBM 公司保留在 developerWorks 网站上发表的内容的著作权。未经IBM公司或原始作者的书面明确许可,请勿转载。如果您希望转载,请通过 提交转载请求表单 联系我们的编辑团队。
    关于 IBM 隐私条约 联系 IBM 使用条款