金融市场前台办公室中的 IBM 技术,第 2 部分: 基于分析数据流调用条件业务规则

集成 IBM InfoSphere Streams 和 IBM WebSphere ILOG JRules

这是一个系列文章的第二篇,着重于 InfoSphere™ Streams 和 WebSphere® ILOG® JRules 的集成。InfoSphere Streams 是一个高性能的流处理引擎,能够实时执行数据流的计算和分析。ILOG JRules 是业务规则管理系统,允许创建基于规则的应用程序。本文说明了一个简单的算术贸易场景,其中 InfoSphere Streams 对市场数据流进行操作,并且在特定情况下,使用 ILOG JRules 调用业务规则。为了执行此任务,您将学习如何以有效的方式将这两个产品集成在一起。

Nick Schofield, 软件开发人员, IBM

Nick Schofield 的照片Nick Schofield 是一名软件开发人员,在多伦多的 IBM Software Development Lab 的 IBM Software Services for WebSphere organization 团队工作。他参与过各种项目,包括性能测试/调整、应用程序开发和产品集成。



Mary Taylor, IT 咨询专家, IBM

Mary Taylor 作者照片Mary 在 IBM 已经超过 25 年了。她从事过各种岗位,包括程序员、系统工程师、项目主管、DBA 和数据质量专家。在过去四年中,她一直在 SWG Technical Strategy 团队工作。在那段时间,她主要关注 Software as a Service。现在,她管理 SWG Incubation 计划。在过去 18 个月中,她领导一个名为 Botticelli 的项目,该项目着重于 IBM 中间件在金融市场前端办公室中的定位。



2010 年 11 月 29 日

简介

本文是一个 系列文章 的第二篇,该系列文章研究如何可以集成 IBM 中间件功能来解决金融市场前端办公室的技术需求。前端办公室需求非常需要能够以极快的速度处理大量数据的专门软件。本系列文章讲述了一个算术贸易场景,其作为 IBM Software Group 的孵化项目的一部分而实现。

InfoSphere Streams

InfoSphere Streams (Streams) 是一个应用程序开发环境和运行时,允许您创建实时对数据流进行操作的应用程序。Streams 应用程序以一种高级别流处理语言进行定义。每个 Streams 应用程序都是一系列算子,对流过的数据执行函数或计算。然后这些算子链在一起以便执行复杂的函数。将包括许多执行函数的算子,例如数据聚合、联接、分割和复杂函数。Streams 还允许您创建自定义算子来执行特定指令。这些自定义算子可以用 Java 或 C++ 进行编码,它们被称为用户定义的算子(UDOP)。本文讲述如何创建 Java UDOP 来与 ILOG JRules 集成。

WebSphere ILOG JRules

ILOG JRules (JRules) 是一种业务规则管理系统(BRMS),允许您设计、开发和部署可以从外部系统或应用程序访问的业务规则。使用 Rule Studio 开发环境或 Rule Team Server(RTS)Web 应用程序开发业务规则。然后将规则部署到 Rule Execution Server(RES)。应用程序可以连接到 RES 并在那里执行规则,或者获得规则并在本地执行它们。由于性能原因,本文中的场景使 Streams Java UDOP 获得规则并在本地执行它们。

先决条件

为了完成此示例场景,您需要安装下列软件组件:

  • ILOG Rule Studio Development Environment
  • ILOG Rule Team Server
  • ILOG Rule Execution Server
  • InfoSphere Streams

关于场景

本文讲述了一个示例场景,其中 JRules 和 Streams 需要配合工作。您可以遵循该场景使其在您自己的环境中工作。

该场景开始为一个 Streams 应用程序,其接收股票交易和报价。然后,您将对此市场数据执行一些简单计算来生成名为交易指数 的值。如果该值大于某个阈值,您将自动对此股票下单。为了防止在未经批准的情况下下任何大型订单,您将修改该 Streams 应用程序,将任何大型订单请求首先传送给 JRules。然后,JRules 将确定是否应该自动履行某个订单。


使用 ILOG Rule Studio Development Environment 创建业务规则

完成下列步骤来使用 ILOG Rule Studio ILOG Rule Studio 创建业务规则。

创建 XOM

创建 JRules 应用程序时要做的第一件事是定义执行对象模型(XOM)。XOM 定义您将对 JRules 进行的调用的输入和输出。对于本场景,您使用简单 Java® 对象(POJO)作为接口。还可以使用 XML 模式来定义 XOM。

完成下列步骤来使用 POJO 创建 XOM 开放 Rule Studio:

  1. 通过为 Project name 输入 OrderDecision-XOM 来创建一个新 Java 项目,如图 1 中所示,然后单击Next
    图 1. OrderDecision-XOM 项目创建
    屏幕截图:New Java Project 窗口,Project Name 字段中为 OrderDecision-XOM
  2. 通过在 Name 字段中输入 OrderDecision 在 com.ibm.orderdecision 包中创建一个新 Java 类,如图 2 中所示。
    图 2. OrderDecision Java 类
    屏幕截图:New Java Class 窗口,Name 字段中为 OrderDecision
  3. 在 OrderDecision 类的 Interfaces 字段中添加下列字段,然后单击 Finish
    • private Boolean canContinue;
    • private String symbol;
    • private Double price;
    • private Double qty;
  4. 按照清单 1 中显示的用于 symbol 字段的方法为以上 4 个字段创建 getter 和 setter。
清单 1. 创建 getter 和 setter
	    public String getSymbol() {
	        return symbol;
	    }
	    public void setSymbol(String symbol) {
	        this.symbol = symbol;
	    }

canContinue 字段包含规则执行的结果并确定该订单是否为异常。如果订单为异常,则 canContinue 的值为 0。符号、价格和数量将从 Streams 程序中处理的市场数据传递到 ILOG。

创建规则

完成下列步骤来创建将执行的实际业务规则:

  1. 通过在 New Rule Project 窗口中为 Project name 输入 OrderDecisionRules 来创建新的标准规则项目,如图 3 中所示。
    图 3. OrderDecisionRules 项目创建
    屏幕截图:New Rule Project 窗口,OrderDecisionRules 作为 Project name
  2. 打开 Rule Project Map 视图,单击 Design Entity 中的 Import XOM,如图 4 中所示。
    图 4. Import XOM
    屏幕截图:高亮显示 Import XOM
  3. 选择 Java Execution Object Model
  4. 选择 OrderDecision-XOM,然后单击 OK
  5. 单击 Rule Project Map 视图中 Design Entity 中的 Create BOM(业务对象模型)。BOM 包含将输入 Java 或 XML 代码转换为规则引擎中使用的业务术语,如图 5 中所示。
    图 5. Design Entity - 步骤 1
    Design Entity,其中高亮显示 Import XOM (1) 和 Create BOM
  6. 接受第一个页面中的默认值,确保在 New BOM Entry 窗口中的 Select classes 下选中 com.ibm.orderdecision 包,如图 6 中所示。
    图 6. BOM Entry
    屏幕截图:New BOM Entry 窗口,包名称左边显示对勾
  7. 接受第三个页面上的默认值,然后单击 OK
  8. 单击 Rule Project Map 视图中 Design entity 中的 Define Parameters,如图 7 中所示。
    图 7. Design Entity - 步骤 2
    Design Entity,其中高亮显示 Import XOM (1) 和 Define Parameters
  9. 添加表 1 中显示的规则集参数。
表 1. 规则集参数
名称 类型 方向 默认值 描述
symboljava.lang.StringIN符号
pricedoubleIN价格
qtydoubleIN数量
canContinueBooleanOUTfalse可以继续
  1. 单击 Orchestrate entity 中的 Add Rule Package
  2. 单击 Add rule package 来添加规则集参数,如图 8 中所示。
    图 8. 用于添加规则包的 Orchestrate entity
    Orchestrate entity,高亮显示 Add Rule Package
  3. 将包命名为 com.ibm.orderdecision
  4. 单击 Orchestrate entity 中的 Add Ruleflow,如图 9 中所示。规则流表示应用程序业务逻辑。规则流用于通过将规则分组为一系列任务来编排规则执行。
    图 9. 用于添加规则流的 Orchestrate entity
    Orchestrate entity,其中高亮显示 Add Rule Package(3) 和 Add Ruleflow
  5. 按如下所示设置规则流选项:
    1. 输入 /OrderDecisionRules/rules 作为 Source folder。
    2. 输入 com.ibm.orderdecision 作为 Package。
    3. 输入 OrderDecisionRuleFlow 作为 Name。
    4. 输入 RuleFlow 作为 Type。
    5. 单击 Finish
  6. 单击 Rule Project Map 视图中的 Author entity 中的 Add Business Rule,如图 10 中所示。
    图 10. 用于添加业务规则的 Author entity
    屏幕截图:Add Business Rule,其中高亮显示 Add business rule
  7. 按如下所示设置业务规则属性:
    1. 输入 /OrderDecisionRules/rules 作为 Source folder。
    2. 输入 com.ibm.orderdecision 作为 Package。
    3. 输入 Order Decision Rule 作为 Name。
    4. 输入 ActionRule 作为 Type。
    5. 单击 Finish
  8. 如清单 2 中所示设置规则代码。
清单 2. 订单决策规则
if
     price * quantity is less than 500 or symbol is "IBM"
then
     set 'can continue' to true;

此代码为下列订单将 canContinue 标志设置为 true:所有 IBM 订单以及价格与数量相乘小于 500 的其他订单。Streams 中的这些订单都是非异常订单(或直接处理订单)。

  1. 编辑您之前创建的 OrderDecisionRuleFlow。
  2. 在选项板中输入开始元素、结束元素和规则任务元素,然后将它们连接在一起,如图 11 中所示。
    图 11. 创建规则流图表
    绿点表示开始,指向 task_1,其指向表示结束的红点
  3. 在 Rule Task 下将 ID 更改为 Order Decision
  4. 将规则 com.ibm.orderdecision.Order Decision Rule 添加到 Properties 选项卡中的 Rule Selection 部分,如图 12 中所示。
    图 12. 更新规则部分
    屏幕截图:提示在规则任务中列出规则和规则包

将规则部署到 Rule Team Server

一旦完成了规则项目,就可以将其发布到 Rule Team Server (RTS)。使用 RTS 中的规则,业务用户可以更改您设置的值。例如,如果有太多订单被视为异常,业务用户可以将阈值从 500 提高到 1000。

要部署到 RTS,请完成下列步骤:

  1. 在左边的规则浏览器面板中右键单击 OrderDecisionRules 项目,然后单击 Rule Team Server > Connect
  2. 输入您的连接信息。
  3. 确保选中 Create a new project on Rule Team Server,然后单击Finish

完成!业务用户现在可以登录到 RTS 并编辑您刚刚发布的规则。

将规则发布到 Rule Execution Server

为了执行规则,您必须将规则从 RTS 发布到 Rule Execution Server (RES)。RES 提供两种功能:

  • 它可以远程执行规则,然后将响应发送回调用者。
  • 它可以将 RES 用作规则存储库。

对于示例场景,您将连接到 RES 并获得最新版本的规则,但是为了获得最高性能,您将在您的 InfoSphere Streams 环境内本地执行这些规则。

创建 ruleapp

ruleapp 是一种打包格式,用于在 RES 上部署规则。一旦您创建了规则集,它就将打包在 ruleapp 内并部署到 RES 执行环境中。完成下列步骤来创建 ruleapp:

  1. 登录到 RTS,然后单击 Configure > Manage RuleApps
  2. 单击 New,然后输入名称 OrderDecisionRuleApp
  3. 单击 Rulesets 下的 New,将其命名为 OrderDecisionRuleSet
  4. 选择 OrderDecisionRules 项目,然后单击 Save > Save 来保存 rule app。

发布 ruleapp

完成下列步骤来发布 ruleapp:

  1. 单击 Configure > Manage RuleApp
  2. 选中 OrderDecisionRuleApp 旁边的框,然后单击 Deploy
  3. 取消选中 Create a baseline for this deployment 旁边的框,然后单击 Next
  4. 选中 Deploy on a Rule Execution Server,然后单击 Next
  5. 选中 Increment ruleset(s) major version,然后单击 Next
  6. 从列表中选择执行服务器,然后单击 Deploy

开发和集成 Streams 应用程序

本节讲述如何使用 Streams 应用程序。

应用程序概述

下载 中提供的示例 Streams 应用程序计算交易指数。图 13 显示了从 StreamSight 工具获取的应用程序的图形视图,该工具随 InfoSphere Streams 提供。

图 13. Streams 应用程序
数据从左向右流动,如下所述

在图 13 中,数据从左边的来源算子进入,然后分为两个流。一个流用于交易,另一个流用于报价。然后交易流聚合并进行运算来计算成交量加权平均价(volume weighted average price,VWAP)。然后 VWAP 与报价流联接,并且计算交易指数。最后,如果交易指数值高于某个阈值,则生成订单。

对于示例场景,您将通过在图表右侧的最后两个算子之间放置一个算子来扩展此应用程序。此算子将调用您已经创建的 JRules 规则来决定是否可以继续执行订单。

扩展应用程序

完成下列步骤,通过添加 JavaUdop 算子来扩展应用程序。

  1. 通过输入 tar -xvzf orderDecision_orig.tar.gz,将附加的 Streams 应用程序 (orderDecision_orig.tar.gz) 解压缩到您的计算机上,例如名为 orderDecision_orig 的目录。
  2. 在新创建的 orderDecision_orig 目录中创建名为 classpath 的目录,然后复制下面显示的 jar。
    • 在 ILOG studio lib 目录中:
      • asm-3.1.jar
      • asm-analysis-3.1.jar
      • asm-commons-3.1.jar
      • asm-tree-3.1.jar
      • asm-util-3.1.jar
      • bcel-5.1.jar
      • jdom-1.0.jar
      • jrules-engine.jar
      • sam.jar
    • 在 db2 java 目录中:
      • db2jcc4.jar
    • 在 ILOG executionserver lib 目录中:
      • jrules-res-execution.jar
      • j2ee_connector-1_5-fr.jar
  3. 通过获取 下载 中的 ra.xml 文件来配置 JRules 连接。
  4. 将该文件放在 classpath 目录中。
  5. 编辑文件并针对 persistenceProperties 属性输入 JRules RES db 的用户名、密码和 URL。
  6. 在 vwap.dps 文件结尾的接收算子前面添加清单 3 中显示的两个算子,从而将 Java Udop 添加到 Streams 应用程序。vwap.dps 文件包含 图 13 中显示的 Streams 应用程序。
清单 3. 用于分割订单流的 Java UDOP
	    stream OrdersNotToVerify(schemaFor(Orders))
		:= Functor(Orders)
		     [price * qty <= 200d]
		     {}	

	    stream OrdersToVerify(schemaFor(Orders))
		:= Functor(Orders)
		     [price * qty > 200d]
		     {}

这些算子将订单流分割为两个单独的流。一个流包含非常小而不需要确认的订单。JRules 需要确认另一个流。

  1. 在 OrdersToVerify 算子后添加清单 4 中的算子,从而添加对 JRules 引擎的调用。
清单 4. 对 OrdersToVerify 算子的更新
	    stream CheckedOrders(schemaFor(Orders), canContinue: Boolean)
	     := JavaUdop(OrdersToVerify)
	     [generated;
	     className:"com.ibm.orderchecker.OrderDecision";
	     classLibrary:
	     "../orderDecision_orig/classpath/asm-3.1.jar",
	     "../orderDecision_orig/classpath/asm-analysis-3.1.jar",
	     "../orderDecision_orig/classpath/asm-commons-3.1.jar",
	     "../orderDecision_orig/classpath/asm-tree-3.1.jar",
	     "../orderDecision_orig/classpath/asm-util-3.1.jar",
	     "../orderDecision_orig/classpath/bcel-5.1.jar",
	     "../orderDecision_orig/classpath/j2ee_connector-1_5-fr.jar",
	     "../orderDecision_orig/classpath/jdom-1.0.jar",
	     "../orderDecision_orig/classpath/jrules-engine.jar",
	     "../orderDecision_orig/classpath/jrules-res-execution.jar",
	     "../orderDecision_orig/classpath/sam.jar",
	     "../orderDecision_orig/classpath/db2jcc4.jar",
	     "../orderDecision_orig/classpath",
	     "../orderDecision_orig/bin";
 	     vmArg:"-Xmx512m"] {}
  1. classLibrary 参数中将 classpath 目录的路径修改为您上面创建的路径。
  2. 在对 JRules 的调用后面,您需要筛选未通过检查的所有订单并修改 Sink 算子来打印确认的订单以及未检查的订单。通过将文件结尾的 Sink 算子替换为清单 5 中的代码来实现此操作。
清单 5. 修改 VerifiedOrders
	    stream VerifiedOrders(schemaFor(Orders))
	     := Functor(CheckedOrders)
	     [canContinue]
	     {}
	    
	    Null := 
	    Sink (OrdersNotToVerify, VerifiedOrders) ["file:///Orders.dat", nodelays]{}
  1. 保存 vwap.dps 文件。
  2. 从 orderDecision_orig 目录输入 make 命令来编译应用程序。
  3. 对示例场景成功完成编译后,可以看到下面内容:
    • 显示已经创建了 1 个作业和 12 个处理元素(PE)的摘要。
    • 之前创建的三个算子(OrdersToVerify.dpe、OrdersNotToVerify.dpe 和 CheckedOrders.dpe)。
    • 许多 shell 脚本。
    第一次编译 Streams 应用程序且其中具有 JavaUdop 算子时,应用程序自动在 src 目录下创建所需的 .java 和类文件。
  4. 通过将清单 6 中加粗的行添加到现有 OrderDecision.java 文件,修改 OrderDecision.java 文件来调用 ILOG JRules API。清单 6 中的备注说明正在添加的功能。如果您需要修改的 OrderDecision.java 文件,其包含在 下载 中。
清单 6. 筛选订单
package com.ibm.orderchecker;

import ilog.rules.res.model.IlrPath;
import ilog.rules.res.session.IlrJ2SESessionFactory;
import ilog.rules.res.session.IlrSessionRequest;
import ilog.rules.res.session.IlrSessionResponse;
import ilog.rules.res.session.IlrStatelessSession;

import com.ibm.streams.spade.OperatorContext;

public  class OrderDecision extends AbstractOrderDecision {


	private IlrPath path;


   /**
   ** Initialize the operator
   */
   @Override
   public void initialize(OperatorContext context) throws Exception {
	   super.initialize(context);
	
	//Set the path to the Rule. It is in the form /RuleApp/RuleSet
	this.path = 
	  IlrPath.parsePath("/OrderDecisionRuleApp/OrderDecisionRuleSet");
   }

   /**
   ** process method for port 0 (Stream IPort0Stream).
   */
   @Override
   protected void process0(IPort0 tuple) throws Exception {
	//Initialize a new JRules Session. If you are concerned about 
	//performance, this session can be re-used. However if a 
	//new version of the rules are published to the RES, they 
	//will not be picked up until the session is re-initialized
	IlrJ2SESessionFactory sessionFactory = 
	  new IlrJ2SESessionFactory();
	   sessionFactory.setClassLoader(getClass().getClassLoader());
	
	
	//Create the JRules Request Object
	IlrSessionRequest sRequest = sessionFactory.createRequest();
	sRequest.setRulesetPath(path);	
	sRequest.setInputParameter("symbol", tuple.get_symbol());
	sRequest.setInputParameter("price", tuple.get_price());
	sRequest.setInputParameter("qty", tuple.get_qty());

	IlrSessionResponse sResponse = null;
	try {
		//Execute the JRules Rule
		IlrStatelessSession ss = 
		  sessionFactory.createStatelessSession();
		sResponse = ss.execute(sRequest);
		
		//Parse the response
		boolean canContinue = 
((Boolean)sResponse.getOutputParameters().get("canContinue")).booleanValue();
		
		
		//Create the Output Tuple. You have to set all of 
		//the input params as well as the JRules result
		OPort0 otuple = getOutput0().newTuple();
		
		otuple.set_symbol(tuple.get_symbol());
		otuple.set_price(tuple.get_price());
		otuple.set_qty(tuple.get_qty());
		
		otuple.set_canContinue(canContinue);
		
		//Submit the result tuple
		submit0(otuple);
		
	} catch (Exception e) {
		e.printStackTrace();
	}
	
	}
}
  1. 输入命令 ./start_streams_vwap.sh 来启动 Streams 服务器。
  2. 输入命令 ./submitjob_vwap.sh 来提交作业。
  3. 使用 InfoSphere Streams 随附的 StreamSight 工具来查看应用程序,方法为打开 Eclipse 并选择 InfoSphere Streams Live Graph 透视图。
  4. 单击工具栏上的 Load 图标,然后选择 Spade。图 14 显示了所得的应用程序图表。
图 14. 修改的 Streams 应用程序
新数据流,在 Orders 函数后的路径中具有其他决策分支

在图 14 中,数据按照与图 13 中相同的方式从左向右流动,但是到了 Orders 函数时,该流分为两个路径,然后在 Sink 函数前再次合并。

  1. 运行应用程序。您修改了 orderDecision.java 文件来调用 ILOG 规则,如果价格与数量相乘小于 $500 或者如果符号为 IBM,则该规则将 canContinue 标志设置为 true。运行应用程序后,您将在 orderDecision_orig/data 目录中看到输出文件 Orders.dat。在该文件内,您应该仅看到符合您的 ILOG 规则的订单,如清单 7 中所示。
清单 7. 生成的订单文件
	    symbol:TWX,price:17.7,qty:1
	    symbol:UNH,price:64.6,qty:4
	    symbol:MRK,price:32.07,qty:3
	    symbol:VRX,price:18.67,qty:1
	    symbol:CD,price:16.75,qty:17
	    symbol:CVS,price:27.26,qty:10
	    symbol:CVS,price:27.26,qty:8
	    symbol:HAR,price:97.23,qty:1CH
	    symbol:IBM,price:83.59,qty:6
	    symbol:THC,price:7.93,qty:14
	    symbol:THC,price:7.93,qty:7
	    symbol:FSLb,price:26.04,qty:4
	    symbol:BHI,price:61.95,qty:3
	    symbol:LRY,price:43.67,qty:4
	    symbol:DG,price:19.18,qty:24
	    symbol:CEG,price:56.79,qty:1

结束语

本文说明了开发业务规则并将其部署到 JRules 应用程序以及从 InfoSphere Streams 应用程序使用此规则的流程。这两种技术的组合为业务用户提供了一种强大的方法,可以实时修改高性能处理引擎使用的规则,而无需开发或部署操作。


下载

描述名字大小
本文下载StreamsJRulesCodeDownloads.zip831KB

参考资料

学习

获得产品和技术

讨论

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Information Management, WebSphere, Java technology, XML
ArticleID=592052
ArticleTitle=金融市场前台办公室中的 IBM 技术,第 2 部分: 基于分析数据流调用条件业务规则
publish-date=11292010