WebSphere Operational Decision Management V7.5 的 Generic Ruleset Signature 模式简介,第 3 部分: 集成 Hadoop 与 IBM Operational Decision Management V8

本系列的第三部分将介绍如何使用 Generic Ruleset Signature 模式将 IBM® Operational Decision Management 与 Apache™ Hadoop™ 集成,以便在分布式计算解决方案中执行业务规则。 本文来自于 IBM Business Process Management Journal 中文版

Nigel T. Crowther, 高级 IT 顾问, IBM

Nigel T. Crowther 是 IBM Software Services 中负责 WebSphere BPM Practice 的一名高级 IT 顾问,专攻业务规则。他在设计和开发业务规则与业务流程管理系统方面有 10 多年的经验,在 IT 领域有 25 年的经验。Nigel 参与过所有业务领域的工作,他的工作重点主要在财务系统上。Nigel 于 2009 年因 ILOG 收购而加入 IBM。



Jonathon Carr, 资深 IT 顾问, IBM

Jonathon Carr 是 IBM Software Services 中负责 WebSphere BPM Practice 的一名资深 IT 顾问,专攻业务规则和可视化。Jonathon 在设计、构建和记录业务规则和 GUI 应用程序方面有 10 多年的经验。他于 2009 年因 ILOG 收购而加入 IBM。



2013 年 1 月 05 日

简介

本文提供一个将 Apache Hadoop 与 IBM Operational Decision Management (IBM ODM) 业务规则相集成的解决方案。本文使用了本系列的 第 1 部分第 2 部分 中描述的 Generic Ruleset Signature (GRS) 模式。首先,我们将提供一个通用集成模式,然后我们将展示使用该模式的一个交易验证示例。最后,我将展示如何运行该示例。

Hadoop 和 IBM ODM 是什么?

Hadoop 是一个开源平台,设计用于解决大量非结构化数据的问题。IBM ODM 是一个供业务用户用于操作数据的平台。二者的结合为业务用户创建了一个操作大量数据的强大工具。

Hadoop 的核心是 MapReduce 框架,它可以并行处理数据。并行处理是结合 Hadoop Distributed File System (HDFS) 来实现的,后者管理数据的分发。我们将简单讨论这些组件,有关的更多细节还请参阅 Hadoop 文档

为什么集成一个业务规则管理系统 (BRMS)?

开发 Hadoop 应用程序的简便方法是使用 Java™ 或 Pig 或 Hive 等脚本语言构建业务逻辑。可以引入一个 BRMS,使业务逻辑可外部化到规则中,并以一种对业务友好的方式维护它。这种方法具有两个关键优势:

  • 可以对业务进行更改。
  • 它为更改流程带来了敏捷性和可管理性。

ODM Hadoop 集成指南

本节介绍将 ODM 业务规则与 Hadoop 集成的 6 条指南:

  1. 从 Map 函数调用规则

    使用 Map 函数调用业务规则。Map 函数是数据转换发生的主要位置,是业务规则最可能应用的位置。Reduce 函数主要用于组合数据,所以它对业务规则而言不那么有用。

  2. 使用一个低级规则引擎

    使用原生的规则引擎 API IlrContext。这是最低级的 API;它具有最小的大小,最适合 Hadoop 架构。它允许 Hadoop 而不是 Rule Execution Server 控制规则的分发。

  3. 使用平面数据

    使用平面数据,比如 CSV(逗号分隔值)或制表符分隔的文件。平面数据很紧凑,容易操作,且容易转换为一个哈希图而被规则引擎摄入。可向平面数据中添加更多属性,而无需更改规则集签名。请参见指南 4。

  4. 使用 Generic Ruleset Signature 模式来执行规则

    使用 Generic Ruleset Signature (GRS) 模式,无需更改代码即可添加和删除数据列。有关 Generic Ruleset Signature 模式的更多信息,请参阅本系列的 第 1 部分第 2 部分

  5. 批处理数据

    批处理 CSV 数据有助于提升性能,减少规则引擎上下文切换。无需一次一行地将数据发送给引擎,可批量发送数据。有关设置理想的批处理大小的更多细节,请参阅 第 2 部分

  6. 将所有功能构建到一个 JAR 中

    Hadoop 的一条关键原则是将可执行文件发送给数据,这比将数据移动到可执行文件中更快。考虑到这条原则,整个 Hadoop 作业会打包在一个 JAR 中。该 JAR 文件包含 MapReduce 类、规则引擎、Hadoop 核心和业务规则,如图 1 所示。

    图 1. MapReduce 作业的内容
    MapReduce 作业的内容

运行 Hadoop 作业

任务管理器负责协调 MapReduce 作业。它确保为每个 Map 作业都提供了一个称为分段 (split) 的数据部分。Map 将分段发送给规则引擎的一个实例,在那里参照规则来处理它。规则创建了一些组合到 Reduce 作业中的临时数据,如图 2 所示。

图 2. 链接规则引擎、Map 和 Reduce
链接规则引擎、Map 和 Reduce

一个交易验证示例

本节提供一个名为 Trade Validator 的交易验证示例,该示例基于 上面 描述的指南。交易验证是 Hadoop 和 ODM 集成的一个不错示例,因为数据量很大且规则频繁更改。

交易数据输入

Trade Validator 的输入是一系列 CSV 分段文件,从 file01.csv 开始,到 file04.csv 结束。图 3 显示了 file01.csv 的一部分。请注意,CSV 文件的格式与 图 6 中定义的输入参数有关联。

图 3. 输入 CSV 文件格式
输入 CSV 文件格式

在图 3 中,在第 206 行上有一个错误的交易,其中 transaction 属性为 XXX。这不符合规定一次交易应为 BUYSELL 的规则,因此会生成一个验证错误。验证错误被附加到名为 validationErrors.csv 的输出文件中。一个验证错误包含分段文件名、行号、客户 ID、失败的属性和原因。图 4 显示了输入与输出之间的关联。

图 4. 与输入分段关联的验证错误
与输入分段关联的验证错误

顶级规则项目

顶级规则项目名为 grs-ruleflow-batch,如图 5 所示。它的输入和输出参数是一个哈希图批处理。该批处理中的每个交易都会被处理,直到所有交易都处理完。

图 5. 顶级规则项目
顶级规则项目

输入参数

图 6 中定义了规则引擎的输入参数。它们是在 4 个 GRS 参数表中定义的,一个表对应一个原语类型:Integer、String、Double 和 Date。

图 6. 输入变量
输入变量

输出参数

规则引擎输出包含一个或多个 ErrorStructuresErrorStructure 包含错误代码、错误消息和参数的名称,如图 7 所示。

图 7. 错误结构
错误结构

验证规则

清单 1 显示了验证业务规则。

清单 1. 交易验证规则
//Check Transaction Date
if
    the date transactionDate is after today
then
    add error for transactionDate reason "Date is in future" ;
    
//Check Total
definitions
    set price to the decimal stockPrice ;
    set volume to the integer units ;
    set checkTotal to price * volume ;
if
    the decimal total is not checkTotal
then
    add error for total reason "Total is not correct.  Expected " + checkTotal + 
    " but was " + the decimal total;    
        
//Check Transaction Type
if
    the string transaction is not one of { BUY , SELL }
then
    add error for transaction reason "Invalid transaction" ;
    
//Check Stock Name
if
    the string stockName is not one of { IBM , XYZ }
then
    add error for stockName reason "Invalid stockname" ;

MapReduce 代码

本节将介绍 MapReduce 作业的 Java 代码和规则引擎的集成层。

ValidateTrades 类

清单 2 中的 ValidateTrades 类的 run 方法在 Hadoop 作业启动时调用。在第 12 行上,为该作业提供了配置参数和 class 的名称。在第 18 行上,映射器被设置为后面将要介绍的 TradeMap 类。第 19 和第 20 行被注释掉,以便使用默认联合程序 (combiner) 和缩减程序 (reducer)。在第 25 和第 26 行上,设置了输入和输出目录。最后在第 28 行上运行作业。

清单 2. ValidateTrades 类
09 public class ValidateTrades extends Configured implements Tool {
10
11        public int run(String[] args) throws Exception {
12                JobConf conf = new JobConf(getConf(), ValidateTrades.class);
13                conf.setJobName("TradeValidator");
14
15                conf.setOutputKeyClass(Text.class);
16                conf.setOutputValueClass(Text.class);
17
18                conf.setMapperClass(TradeMap.class);
19                // conf.setCombinerClass(Reduce.class);
20                // conf.setReducerClass(Reduce.class);
21
22                conf.setInputFormat(TextInputFormat.class);
23                conf.setOutputFormat(TextOutputFormat.class);
24
25                FileInputFormat.setInputPaths(conf, new Path(args[0]));
26                FileOutputFormat.setOutputPath(conf, new Path(args[1]));
27
28                JobClient.runJob(conf);
29
30                return 0;
31        }
32
33        public static void main(String[] args) throws Exception {
34                int res = ToolRunner.run(new Configuration(), new ValidateTrades(),
35                                args);
36                System.exit(res);
37        }
38 }

TradeMap 类

清单 3 中的 TradeMap 类包含调用规则的 Hadoop map 方法。

在第 17 行上,创建了 RuleEngineRunner。它在第 27 行上的一个静态初始化器中进行初始化,以便只需在映射作业开始时创建一次。第 31 行上的 map 方法接受了一行 CSV 数据。它在第 39 行上将数据转换为一个键/值对映射。该映射在第 40 行上添加到 batch 中。如果该批处理是满的,或者您位于文件的末尾(第 43 行),批处理会发送到规则引擎进行处理(第 45 行)。批处理有助于提高规则引擎的性能,减少调用数量。输出结果是一个包含验证错误的 CSV 字符串(第 45 行)。如果错误数量不为 0(第 47 行),那么会将错误添加到输出收集器中(第 50 行)。输出收集器将会收集错误,这些错误最终会写入输出文件中。在第 54 行上,将当前的状态发送给了报告程序。

清单 3. TradeMap 类
14 public class TradeMap extends MapReduceBase implements
15 Mapper<LongWritable, Text, Text, Text> {
16
17      private static RuleEngineRunner runner = new RuleEngineRunner();
18      static final String EOF = "EOF";
19      private Text id = new Text();
20      private Text errors = new Text();
21      private long numRecords = 0;
22      private long numValidationErrors = 0;
23      private String inputFile;
24      private Batch batch = new Batch();
25
26      static {
27              runner.initialise();
28      }
29
30      @Override
31      public void map(LongWritable key, Text value,
32                      OutputCollector<Text, Text> output, Reporter reporter)
33                      throws IOException {
34
35              String csvLine = value.toString();
36
37              if (!csvLine.startsWith(EOF)) {
38
39                      Map<String, String> parameters = MapFactory.csvToMap(csvLine);
40                      batch.addItem(parameters);
41              }
42
43              if (batch.isFull(++numRecords) || csvLine.startsWith(EOF)) {
44
45                      String result = runner.runValidationRules(batch.getBatch());
46
47                      if (result.length() > 0) {
48                              errors.set(result);
49                              id.set(inputFile);
50                              output.collect(id, errors);
51                              ++numValidationErrors;
52                      }
53
54                      reporter.setStatus("Processed " + numRecords + " records from "
55                                      + inputFile + " with " + numValidationErrors
56                                      + " validation errors");
57
58                      batch.clear();
59              }
60      }
61
62      public void configure(JobConf job) {
63
64              inputFile = job.get("map.input.file");
65
66              int fileNamePos = inputFile.lastIndexOf('/') + 1;
67              inputFile = inputFile.substring(fileNamePos);
68
69              String colsStr = job.get("column.names", "");
70              MapFactory.getColumns(colsStr);
71
72      }

MapFactory 类

清单 4 中的 MapFactory 类是一个静态工厂,用于从 CSV 数据构建 Java 哈希图。第 18 行上的 getColumns 方法将列名称的逗号分隔列表令牌化。可以从命令行获取这些名称,并使用它们将这些键填充到哈希图中。

第 39 行中的 csvToMap 方法将 CSV 数据的逗号分隔列表令牌化。此数据用于将值填充到哈希图中。

清单 4. MapFactory 类
07 public class MapFactory {
08
09      int MAX_COLS = 20;
10      private static String[] cols = new String[MAX_COLS];
11
12      /**
13       *
14       * @colsStr get the column names from the input parameters and 
15       *          copy it into an internal structure
16       * @return void
17       */
18      public static void getColumns(String colsStr) {
19
20              int i = 0;
21              StringTokenizer itr = new StringTokenizer(colsStr, ",");
22
23              // Get Column Names. This will be the key values in the 
24              // parameter hash map
25              while (itr.hasMoreTokens() && i < MAX_COLS) {
26                      cols[i] = itr.nextToken();
27                      System.out.println("Col[" + i + "]=" + cols[i]);
28                      i++;
29              }
30      }
31
32      /**
33       *
34       * @param row
35       *            of CSV data representing a set of parameters to be
36       *            passed to the rule engine
37       * @return parameters as a HashMap
38       */
39      public static Map<String, String> csvToMap(String row) {
40
41              // Get parameters which are defined on a row of CSV data.
42              StringTokenizer itr = new StringTokenizer(row, ",");
43
44              Map<String, String> args = new HashMap<String, String>();
45              int i = 0;
46
47              while (itr.hasMoreTokens() && i < MAX_COLS) {
48
49                      String valueStr = itr.nextToken();
50
51                      // cols is the key containing the parameter name
52                      args.put(cols[i++], valueStr);
53              }
54
55              return args;
56      }
57 }

清单 5 中的 Batch 类封装一个将由规则引擎处理的数据批处理。第 7 行上设置了常量 BATCH_SIZE。这控制一个批处理中的行数。应该设置该常量,这样才能让批处理适合 JVM 中的物理内存。在第 8 行上,将批处理创建为一个映射列表。每个映射包含一行 CSV 数据。在第 17 行上,方法 addItem 向批处理添加了一个行。在第 31 行上,方法 isFull 确定批处理是否已满并准备供处理。

清单 5. Batch 类
05 public class Batch {
06
07 private static int BATCH_SIZE = 100;
08      private java.util.List<Map<String, String>> batch = 
09                               new java.util.ArrayList<Map<String, String>>();
10
11      /**
12       * Add a CSV row to the batch
13       * 
14       * @param parameters
15       *            The parameters to add to the batch
16       */
17      public void addItem(Map<String, String> parameters) {
18              batch.add(parameters);
19      }
20
21      /**
22       * Returns <code>true</code> if the batch is the right size for processing
23       * and <code>false</code> otherwise.
24       * 
25       * @param numberOfRecords
26       *            The number of records currently in the batch
27       * @return <code>true</code> if the batch is full and ready for processing
28       *         and <code>false</code> otherwise.
29       * 
30       */
31      public boolean isFull(long numberOfRecords) {
32
33              return ((numberOfRecords % BATCH_SIZE) == 0) ? true : false;
34      }
35
36      /**
37       * Clear the batch.
38       */
39      public void clear() {
40              batch.clear();
41      }
42
43      /**
44       * @return the real batch
45       */
46      public java.util.List<Map<String, String>> getBatch() {
47              return batch;
48      }
49 }

RuleEngineRunner 类

RuleEngineRunner 是规则引擎 API 之上的一个轻量型包装器。以下是此类中的重要方法。

initialise 方法

在清单 6 中,initialise 方法读取了 validationRules.jar 中的规则(第 146 行),该规则使用了一个相对路径引用,因为 validationRules.jar 嵌在 Hadoop 作业 JAR 中。规则集加载到了规则引擎中(第 150 行),然后验证并构建(第 153 行)。执行这两行代码后,即可调用规则引擎。

清单 6. RuleEngineRunner 的 initialise 方法
143        public void initialise() {
144
145             // The relative path to the validation rules 
146             final String rulesetPath = "../validationRules.jar";
147
148             try {
149                     // Load the rule set
150                     loadRuleset(rulesetPath);
151
152                     // Prepare an engine for this rule set
153                     buildEngine();
154
155             } catch (Exception exception) {
156                     exception.printStackTrace();
157             }
158     }

loadRuleset 方法

在清单 7 中,loadRuleset 方法使用低级引擎 API 加载并分析验证规则。第 43-47 行创建了一个流来读取规则集。第 49 行创建一个空 IlrRuleset。第 53 行初始化规则集分析器。第 55 行分析规则集,第 60-68 行处理错误。

清单 7. RuleEngineRunner 的 LoadRuleset 方法
42      public void loadRuleset(String rulesetPath) throws Exception {
43
44              JarInputStream is = new JarInputStream(
45                       RuleEngineRunner.class.getResourceAsStream(rulesetPath));
46
47              IlrRulesetArchiveLoader rulesetloader = new IlrJarArchiveLoader(is);
48              IlrRulesetArchiveParser rulesetparser = new IlrRulesetArchiveParser();
49
50              ruleset = new IlrRuleset();
51
52              System.out.println("*****Loading Ruleset: " + rulesetPath);
53
54              rulesetparser.setRuleset(ruleset);
55
56              if (rulesetparser.parseArchive(rulesetloader)) {
57                      return;
58              }
59
60              String[] errors = rulesetparser.getErrors();
61
62              if (errors != null) {
63                      StringBuffer buf = new StringBuffer();
64                      for (int i = 0; i < errors.length; i++) {
65                              buf.append(errors[i]).append('\n');
66                      }
67                      System.out.println("*** ERROR Loading Ruleset: " + ruleset);
68                      throw new Exception(buf.toString());
69              }
70      }

BuildEngine 方法

在清单 8 中,buildEngine 方法使用低级引擎 API 创建规则引擎的一个实例。在第 76 行,使用验证规则创建了一个 IlrContext。第 78-87 行设置错误处理函数。

清单 8. RuleEngineRunner 的 BuildEngine 方法
75      public void buildEngine() throws IlrBadContextException {
76              engine = new IlrContext(ruleset);
77
78              IlrExceptionHandler exceptionHandler = new IlrExceptionHandler() {
79                      public boolean handleException(IlrUserRuntimeException ex) {
80                              if (ex.isInConditions())
81                                      return false;
82                              else
83                                      throw ex;
84                      }
85              };
86              engine.setExceptionHandler(exceptionHandler);
87              engine.setRuleflowExceptionHandler(exceptionHandler);
88      }

RunValidationRules method

在清单 9 中,runValidationRules 方法设置批处理的规则引擎参数(第 140-146 行)。对批处理执行这些规则(第 147 行)。来自规则引擎的输出是一个映射列表(第 150 行)。每个映射包含一次交易的一个或多个验证错误。输出附加到 validationResponse(第 158-159 行)中,其中包含该批处理的所有错误。

清单 9. RuleEngineRunner 的 RunValidationRules 方法
134     @SuppressWarnings("finally")
135     public String runValidationRules(List<Map<String, String>&t; batch) {
136
137             StringBuffer validationResponse = new StringBuffer();
138             try {
139
140                     IlrParameterMap inputs = new IlrParameterMap();
141                     
142                     // Set the request Parameter
143                     inputs.setParameter("request", batch);                  
144
145                     // Execute the rules
146                     engine.setParameters(inputs);
147                     IlrParameterMap outputs = engine.execute();
148
149                     // Retrieve the validation errors
150                     java.util.List<?> errors =  
151                           (java.util.List<?>) outputs.getObjectValue("errorList");
152
153                     for (Iterator<Map> errorsIter = 
154                        (Iterator<Map>)errors.iterator();errorsIter.hasNext();) {
155                             Map errorsMap = (Map) errorsIter.next();
156
157                             // iterate over values only
158                             for (Object error : errorsMap.values()) {
159                                     validationResponse.append(", " + error);
160                             }
161
162                             validationResponse.append("\n");
163
164                     }
165
166             } catch (Exception exception) {
167     }       exception.printStackTrace();
168             } finally {
169                     // Reset is necessary to execute the engine with a new batch
170                     reset();
171                     return validationResponse.toString();
172             }
173     }
174 };

调用 Hadoop 作业

可从命令行使用图 8 中所示的语法初始化 Trade Validator 作业。

图 8. 调用 Trade Validator Hadoop 作业
调用 Trade Validator Hadoop 作业

备注:此命令作为本文的 下载 部分中的一个文本文件提供。

运行状态和完成

当 Hadoop 作业正在运行时,您可以在 Hadoop 控制台中看到每个任务的状态。图 9 中的状态屏幕显示了 4 个映射任务的信息:3 个已完成,一个完成了 90%。

图 9. 查看 Trade Validator Hadoop 作业的状态
查看 Trade Validator Hadoop 作业的状态

在 Ubuntu 上运行 Hadoop Trade Validator

在本节中,我们将介绍在 Ubuntu 上运行 Hadoop Trade Validator 的步骤。首先,您将使用所提供的规则和数据运行 Hadoop Trade Validator,然后您将修改规则和数据并重新运行它。

入门

开始之前,完成以下步骤,为您的环境做好准备。

  1. 下载并安装 Ubuntu
  2. 下载 hadoop-1.0.3-bin.tar.gz
  3. 安装 IBM Operational Decision Management V8。
  4. 在 /home/[username] 中创建一个名为 Hadoop 的目录。
  5. 将 Hadoop TAR 解压到 /home/[username]/Hadoop 中。这将创建一个 hadoop-1.0.3 目录。
  6. 下载本文所提供的 tradeValidator.tar.gz,将它复制到 /home/[username]/Hadoop。这会创建两个目录:tradeValidator 和 tradeValidatorRules。
  7. 在 tradeValidator 目录中,编辑 build.properties。设置 wodm.home.dir,使其指向您的 IBM Operational Decision Management 8.0 安装目录;例如:

    wodm.home.dir=/home/myusername/IBM/WODM80

    另外设置 hadoop.lib.dirhadoop.core,使其指向您的 Hadoop 安装。如果您执行了前面的步骤,这应该是正确的;例如:


    hadoop.lib.dir=../hadoop-1.0.3
    hadoop.core=hadoop-core-1.0.3.jar
  8. 使用以下命令安装 Apache Ant: sudo apt-get install ant1.7
  9. 现在使用以下命令安装 JDK: sudo apt-get install openjdk-7-jdk
  10. 在 /home/[username]/Hadoop/tradeValidator 目录中,通过键入以下命令复制该代码:ant

    您应看到消息 BUILD SUCCESSFUL

  11. 编辑 hadoop-1.0.3/conf 目录中的 hadoop-env.sh,确保 JAVA_HOME 与您的安装相匹配;例如:
    export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-i386

运行 Hadoop Trade Validator

在 /home/[username]/Hadoop/tradeValidator 目录中,通过发出以下命令,为 run.sh 提供可执行文件权限,然后在单机模式下运行 Hadoop 作业:

chmod +x run.sh
./run.sh

结果会写入输出文件夹和屏幕上。预期的验证错误如图 10 所示。

图 10. 来自 Hadoop 作业的输出
来自 Hadoop 作业的输出

修改 Trade Validator 规则和输入数据

在本节中,我们将修改一些规则,输入数据,然后重新运行 Trade Validator。

导入规则项目

要导入规则项目,请执行以下操作:

  1. 将文件夹 tradeValidatorRules 导入 Rule Designer 中。您会看到该项目,如图 11 所示。
    图 11. Trade Validator 项目
    Trade Validator 项目
  2. 单击 Finish。所有项目都应该无错误地构建。

添加一个新输入参数

要添加一个新输入参数,请执行以下操作:

  1. 编辑 tradeValidatorRules/grs-bom/resources 目录中的 Domains.xlsx 文件,并将一个新的域项添加到 String 工作表 currency 中,如图 12 所示。
    图 12. 新的货币输入参数
    新的货币输入参数
  2. 在同一个电子表格中,插入一个名为 CurrencyName 的新工作表,其中包含图 13 中的内容。要加快操作速度,可以复制 StockName 表的内容,将值 IBM 更改为 USD,将值 XYZ 更改为 GBP
    图 13. 货币名称
    货币名称
  3. 保存和关闭 Domains.xlsx 文件。
  4. 回到 Rule Designer 中,右键单击 tradeValidatorRules/grs-bom/resources 并刷新文件夹。
  5. 打开 grs-bom 规则项目的 bom/domains/dynamic/dt/parameters 中的 StringParameter 类。
  6. 单击 Synchronize 链接,挑选新的 currency 域项。图 14 显示了具有新的 currency 域值的 StringParameter 类。单击 Save
    图 14. 更新的 StringParameter 域
    更新的 StringParameter 域
  7. 在 StringDomains 包中,添加一个名为 CurrencyNameDomain 的新类。确保该类拥有默认的动词化表达,使用 java.lang.String 作为 Execution nameSuperclass
  8. 单击 Create a domain 并选择 Excel Dynamic Domains
  9. 选择 Domains.xlsx 文件并创建值,如图 15 所示。
    图 15. 创建 CurrencyName 域
    创建 CurrencyName 域
  10. 单击 Finish,然后单击 Save。最终的类定义如图 16 所示。
    图 16. CurrencyNameDomain 类
    CurrencyNameDomain 类

    查看图 16 的大图。)

  11. 配置新的 currency 输入参数,向 grs-bom 项目中的 rules/parameters/StringParameters 决策表添加一行,如图 17 所示。
    图 17. 配置货币字符串参数
    配置货币字符串参数
  12. 将一个名为 CheckCurrency 的新操作规则添加到 Validation 包的 grs-rules 规则项目中,如清单 10 所示。
清单 10. CheckCurrency 规则
if
    the string currency is not one of { GBP , USD }
then
    add error for currency reason "Invalid Currency"
  1. 重新生成规则集归档文件,方法是右键单击 grs-ruleflow-batch 规则项目并选择 Export => Ruleset archive。指定 validationRules.jar 作为归档文件名称,并将它保存在 /home/[username]/Hadoop/tradeValidator/resources 中。
  2. 要修改输入数据,请在 tradeValidator/input 目录中使用以下命令将 USD 添加到 CSV 文件的每一行末尾: sed -i 's/$/,USD/' *.csv
  3. 编辑 file01.csv,修改第二到最后一行,使货币值为 AUD,而不是 USD。注意:不要修改最后一行,它是文件结尾 (EOF) 指示符。

重新运行 Hadoop Trade Validator

在 /home/[username]/Hadoop/tradeValidator 中,通过键入 ant,从命令行重新构建代码。

您会看到消息 BUILD SUCCESSFUL。此构建版本将最新的验证规则合并到 Hadoop JAR 文件中。

在同一个目录中,编辑 run.sh 文件并将 currency 附加到 column.names 参数上,如下所示:

hadoop jar tradevalidator.jar
tradevalidator.ValidateTrades
-Dcolumn.names=lineNumber,customerId,customerName,stockName,
transaction,transactionDate,units,stockPrice,total,currency 
input output

从同一个目录,通过键入以下命令,对 Trade Validator 重新运行 Hadoop 作业: ./run.sh

结果被写入 output 文件夹中和屏幕上。预期的验证错误应如图 18 所示。请注意,无效的货币 AUD 已在 file01.csv 中检测出来。

图 18. 重新运行 Hadoop 作业的输出
重新运行 Hadoop 作业的输出

结束语

我们有关业务规则的 GRS 模式的由三部分组成系列文章到此结束了。第 1 部分 简单概述了该模式和它的优缺点。在 第 2 部分 中,我们针对真实用途巩固了 GRS 的原始实现。在最后这篇文章中,我们演示了一个真实示例,展示了如何将 GRS 用于 Hadoop 来进行大规模交易验证。


致谢

感谢 Pierre Berlandier 和 Richard G. Brown 审核本文,特别感谢 Scott Gray 提供了实现 Hadoop 解决方案的专家经验。


下载

描述名字大小
带业务规则的 Hadoop 交易验证器tradeValidator.zip78KB

参考资料

学习

获得产品和技术

讨论

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=WebSphere
ArticleID=929321
ArticleTitle=WebSphere Operational Decision Management V7.5 的 Generic Ruleset Signature 模式简介,第 3 部分: 集成 Hadoop 与 IBM Operational Decision Management V8
publish-date=01052013