在 InfoSphere Streams 中集成 SPSS Model Scoring,第 1 部分: 从 InfoSphere Streams 运算符调用 Solution Publisher

在流数据应用程序中使用数据建模,实现可操作的商务智能

学习如何在实时评分环境中利用 SPSS 的预测能力。“在 InfoSphere Streams 中集成 SPSS Model Scoring” 系列文章的第 1 部分中,将介绍如何编写和使用 IBM® InfoSphere® Streams 运算符,利用 IBM SPSS Modeler Solution Publisher Runtime Library API 在 InfoSphere Streams 应用程序中执行 IBM SPSS Modeler 预测模型。

Mike Koranda, InfoSphere Streams 发布经理, IBM Global Business Services

Mike KorandaMike Koranda 是 IBM 的 Software Group 的一名高级技术人员,已经在 IBM 工作了 30 多年。最近 6 年,他一直在从事 InfoSphere Streams 产品的开发工作。



2012 年 1 月 09 日

开始之前

免费下载:IBM® InfoSphere Streams V2 试用版
下载更多的 IBM 软件试用版,并加入 IBM 软件下载与技术交流群组,参与在线交流。

关于本系列

InfoSphere Streams 是一个能对不断变化的数据进行实时分析的平台。IBM SPSS 系列产品提供了建立预测分析模型的功能。“在 InfoSphere Streams 中集成 SPSS Model Scoring” 系列文章可供需要在实时评分环境中利用功能强大的预测模型的 Streams 开发人员进行参考。

关于本教程

本教程将介绍如何创建一个可在 Streams 应用程序中使用并执行 SPSS 预测模型的 InfoSphere Streams 运算符。它提供了一个运算符和数据样例,以演示该集成功能。然后将介绍如何修改此样例,以便用于不同的 SPSS 模型。在 第 2 部分 中,您将学习如何扩展非泛型运算符来使用预测模型的 XML 元数据,从而允许在 Streams 中使用 SPSS 预测模型,无需具备定制所需的 C++ 技能。

目标

在本教程中,您将了解数据分析师在 SPSS Modeler 要做什么,然后才能准备好 Streams 评分预测模型,您还将看到 Streams 开发人员如何构建一个运算符来执行此模型,并了解 Streams 应用程序如何使用此运算符从流数据中生成实时评分结果。

前提条件

本教程供具有 Streams 编程语言和 C++ 技能的 Streams 组件开发人员和应用程序编程人员阅读。参考本教程或查看并执行其中的样例都可以了解此技术。如果要执行样例,则需要对 UNIX® 命令行 shell 以及 Streams 编程知识有一些了解。

系统要求

要运行此样例,需要配备一台装有 Red Hat Enterprise Linux® 的计算机,并安装 InfoSphere Streams V2.0 或其更高版本、IBM SPSS Modeler Solution Publisher 14.2 fixpack 1 和 Solution Publisher hot fix(预计在 2011 年 10 月 14 日发行)。


概述

为什么要集成 InfoSphere Streams 和 SPSS?

IBM SPSS Modeler 提供了一个可了解数据并生成预测模型的最先进的环境。InfoSphere Streams 提供了一个可伸缩的高性能环境,对不断变化的数据进行实时分析,这些数据中包括传统结构的数据和半结构化到非结构化数据类型。一些应用程序需要对存储的信息进行分析,并且以低延迟、高容量、实时的方式使用统计结果,从而实现计分功能。

角色和术语

在深入研究之前,最好先了解一些角色及其职责,并理解贯穿教程始终的一些术语。

角色

  • 数据分析员:知道如何使用 IBM SPSS Modeler 工具构建并发布预测模型的建模专家。
  • Streams 应用程序开发人员:负责构建应用程序的 InfoSphere Streams 开发人员。
  • Streams 组件开发人员:负责开发 Streams 应用程序开发人员使用的组件的 DeveloperInfoSphere Streams 开发人员。

本教程重点介绍 Streams 组件开发人员角色,以及如何编写可执行 SPSS 预测模型的运算符。同时也介绍了其他角色,因为需要通过他们了解整个方案中的工作进展并与他们进行相互交流。很多案例中,负责 Streams 应用程序开发和 Streams 组件开发的往往是同一个人。

术语

  • 预测模型:SPSS Modeler Stream 中的一套评分方法,有时也称作模型
  • Streams Processing Language (SPL):用来编写 InfoSphere Streams 应用程序的语言。
  • 运算符:InfoSphere Streams 应用程序使用的基本构成组件。Streams 产品本身已包含一组运算符,用户也可以编写自己的自定义运算符。

请注意:当提到术语 “流” 时,可能会让人感到困惑。InfoSphere Streams 中会提到数据 和用 SPL 构建的流应用程序。SPSS Modeler 产品创建了已连接建模器组件的工作流,也称为 “流”。在本教程中,SPSS 建模器流称为预测分析模型,而术语 “流” 用来表示 InfoSphere Streams 数据流。

示例场景

以下是集成模型开发和实时评分的整体流程:

  1. 数据分析员确定需要哪些属性和模型。
  2. Streams 组件开发人员开发出将这些属性作为输入的运算符,并生成所需的输出。
  3. Streams 应用程序开发人员构建一个能获取这些属性、调用运算符并根据评分结果采取行动的应用程序。

这是一个繁琐的过程,包含在已规划好的 Streams 流程中需要使用哪些属性的讨论。因此,我们将实现一个样例场景,在该场景中,数据分析员会评估历史数据,以生成一个有效的评分模型,然后将它传递给 Streams 开发人员,以便构建一个合适的运算符,在 Streams 环境中执行该模型。Streams 应用程序开发人员将会开发出一个微型 Streams 应用程序来验证此模型。

下一步,我们将介绍数据分析员在准备评分模型过程中的职责。


建立预测模型

确定哪些数据可用、哪些模型合适并建立模型,这些通常由数据分析员来完成。

本教程是建立在 SPSS Modeler Client 附带的 Market Basket Analysis 演示和教程基础上。我们会演示如何在 InfoSphere Streams 应用程序中发布 Modeler 中内置的模型评分流并使用它,利用 SPSS 模型的逻辑实现实时评分和决策。

Market Basket Analysis 样例将处理描述超市购物车(所购买商品的集合)的虚拟数据以及消费者的相关个人数据,这些数据可通过会员卡获得。做这些是为了找出购买同一商品的顾客群,并查看是否可以根某一特征区分他们,比如年龄、收入等。

本例演示了如何使用 IBM SPSS Modeler,通过建模和可视化,在数据库中找出倾向性或关联性。这些关联对应着数据中的案例分组,并可以通过建模来深入研究和概括这些案例分组。在零售领域,这样的顾客分组可用于以下目的,比如,提高某一商品的购买率,用以直接邮寄或根据本店顾客需求调整库存量。

建立和分析模型已超出本教程范围。请参考 SPSS Modeler Client 提供的材料,获取有关数据分析员角色和建模的大致信息。

本教程从使用中的建模器会话开始,如图 2 所示。

图 1. Basket Rule 建模器客户端
图片显示建模器客户端中的 Basket Rule STR

为了能够使用本会话中内置的预测模型,需要通过输入节点连接到评分模块,并将评分模块连接到输出平面文件,以创建一个评分分支,如下所示。请注意,本例中的分支非常小,这并不代表实际建模应用程序也没有那么复杂度。该评分模块根据两个字段,即性别和收入,来预测顾客是否倾向于购买以下商品组合:啤酒、豆类和披萨饼。

图 2. Basket Rule 评分分支
图片显示了建模器客户端中的 Basket Rule 评分分支

我们已经创建了一个输入文件样例来测试模型的执行情况。

图 3. Basket Rule 输入文件样例
图片显示了建模器客户端中的 Basket Rule 输入文件数据样例

在 Modeler 中使用用户样例输入会产生以下文件。该样例数据可用来验证 SPSS Modeler Workbench 中的评分分支,还可用来验证我们以后将建立的流操作符和应用程序。

图 4. Basket Rule 输出文件样例
图片显示了 Excel 中的 Basket Rule 输出文件数据样例

最好在创建评分分支后就保存以上已修改的流,然后使用一个临时的 Export 节点来发布在 Streams 操作符中使用评分模型所要用到的小工件。在 Modeler 会话中,将输入文件改成如下所示,并单击 Publish
请注意:请一定要指定发布元数据。这将会生成一份描述模型输入和输出,包括字段和字段类型的 XML 文档。这些信息可提供给 Streams 组件开发人员,让他们来构建调用预测模型的操作符。

图 5. Basket Rule 发布对话框
图片显示了建模器客户端的 Basket Rule 发布对话框

已完成的建模器工作台会话会出现在 streamsbasketrule.str 文件中(参见 下载)。

数据分析员与 Streams 组件开发人员之间的协作

为了编写 Streams 运算符,Streams 组件开发人员需要了解某些信息,比如预测模型的输入和输出。运算符开发人员尤其需要了解以下信息:

  • Solution Publisher 安装位置。
  • 发布过程中产生的 .pim 和 .par 文件。
  • 输入源节点键名称。这些内容可在 XML 文件中找到:
    <inputDataSources>
    <inputDataSource name="file0" type="Delimited">

    请注意:为了简便起见,样例中只支持一个输入源,而实际上对此没有技术限制。

  • 可以在 <inputDataSource> 标记中找到输入字段的名称、存储位置及其顺序:
    清单 1. 在 inputDataSource 标记中找到输入字段名称
    <fields>
      <field storage="string" type="flag">
        <name>sex</name>
      </field>
      <field storage="integer" type="range">
        <name>income</name>
      </field>
    </fields>
  • 输出源节点键名称。这些内容可在清单 2 的 XML 片段中找到:
    清单 2. 在 XML 片段中找到输出源节点键名称
    <outputDataSources>
    <outputDataSource name="file3" type="Delimited">

    请注意:为了简便起见,样例中只支持一个输出源,而实际上对此没有技术限制。

  • 可以在 <outputDataSource> 标记中找到输出字段的名称、存储位置及其顺序。
    清单 3. 在 outputDataSource 标记中找到输出字段名称
    <fields>
     <field storage="string" type="flag">
      <name>sex</name>
     </field>
     <field storage="integer" type="range">
      <name>income</name>
     </field>
     <field storage="string" type="flag">
      <name>$C-beer_beans_pizza</name>
      <flag>
       <true>
        <value>T</value>
       </true>
       <false>
        <value>F</value>
       </false>
      </flag>
     </field>
     <field storage="real" type="range">
      <name>$CC-beer_beans_pizza</name>
      <range>
       <min>
        <value>0.0</value>
       </min>
       <max>
        <value>1.0</value>
       </max>
      </range>
     </field>
    </fields>

数据分析员还会告诉 Streams 组件开发人员,该模型不会修改输入参数,即使它们与模型中列出的输出结果一样。尽管该信息不重要,但会让操作符编写者不必将这些输入重新复制到输出元组,从而实现优化。

现在我们已经发布了模型,并获得了编写运算符所需的信息,下一节将介绍 Streams 组件开发人员如何生成运算符。


编写运算符

准备编写运算符

由于 SPSS Modeler 在 Windows® 上运行,而 Streams 在 Linux 系统上运行,因此最好先验证 Linux 是否设置正确,是否可以执行数据分析员提供的 .pim 文件。对于示例场景,因为我们提供了输入文件样例,并且知道输出结果,我们可以使用作为 Solution Publisher 的一部分提供的单独的 IBM SPSS Modeler Runtime modelerrun 脚本来验证。

请注意:可在 Streams 中使用的 SPSS 补丁包不兼容 modelerrun。一旦用了 SPSS 补丁,就无法使用 modelerrun 程序。为了进行上述验证,必须在安装补丁之前使用该程序,或者将 Solution Publisher 安装到两个不同的位置,并且只将补丁打到使用 Streams 的安装版本上。

如果要在示例数据和模型上运行 modelerrun 脚本,则需要将示例文件(参见 下载)解压到 Linux 系统中安装 Solution Publisher 的位置。将目录更改为 baskrule 目录下的数据目录,并从 Solution Publisher 安装位置执行 modelerrun 脚本,如下所示。

清单 4. modelerrun 脚本
bash-3.2$
bash-3.2 cd /homes/hny1/koranda/baskruletest1/baskrule/data
bash-3.2$  ~koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2/modelerrun 
        -p baskrule.par baskrule.pim 
IBM SPSS Modeler Runtime 14.2
(C) Copyright IBM Corp. 1994, 2011
bash-3.2$

然后查看结果。

清单 5. 运行脚本的结果
bash-3.2$ cat baskrule.csv 
sex,income,$C-beer_beans_pizza,$CC-beer_beans_pizza
F,10000,F,0.988327
F,20000,F,0.989645
F,15000,F,0.988327
F,25000,F,0.989645
F,10000,F,0.988327
F,20000,F,0.989645
F,15000,F,0.988327
F,25000,F,0.989645
M,10000,T,0.838323
M,20000,F,0.990964
M,15000,T,0.838323
M,25000,F,0.990964
M,10000,T,0.838323
M,20000,F,0.990964
M,15000,T,0.838323
M,25000,F,0.990964
bash-3.2$

验证 Solution Publisher 环境工作正常之后,接下来就可以创建运算符了。

运算符特点

Streams 组件开发人员将会利用数据分析员提供的信息构建运算符。在我们的样例场景中,操作符将会接受一个元组,它包含两个属性:

  • 性别字符串,“M” 或 “F”。
  • 收入,包含一个整数值。

该操作符还会产生一个输出元组,传递所有的输入元组属性并添加以下属性:

  • 预测字符串,包括 “T” 或 “F”。
  • 信心(浮点)

我们编写了一个非泛型操作符(对特定的元组/流格式操作的运算符),它使用 Solution Publisher API 来执行 .pim 文件。参考资料 中有关于 Solution Publisher API 更多的信息。

操作符概述

使用 Solution Publisher API 的操作符的大致情况如下:

  1. 操作符初始化代码:
    1. 初始化 Solution Publisher clemrtl API 库。
    2. 打开模型映像 .pim 文件并接收映像句柄。
    3. 从备用输入和输出调用所需的映像中,获取输入和输出字段的必要信息。
    4. 定义一个函数,用于映射模型中所需的输入元组属性和输出字段,并在映像中注册。
    5. 定义一个函数,用于映射模型生成的输出字段,并在映像中注册。
    6. 由于此映像将会在不改变参数的情况下执行相同映像,因此在初始化阶段就要准备好映像。
  2. 在运算符的处理方法中(每个输入元组调用):
    1. 从输入元组属性中填充注册为替代输入的数据结构。
    2. 执行映像
    3. 从注册为替代输出的数据结构中填充输出元组属性。
  3. 在操作符的 prepareToShutdown 方法中:
    1. 关闭映像。

我们会将所有内容查看一遍,重点介绍 Streams 工件中所需的代码。这些工件的完整清单位于 baskrule.zip 文件的 baskrule/MyOp/MyOp.xml、baskrule/MyOp/MyOp_h.cgt 和 baskrule/MyOp/MyOp_cpp.cgt 文件中。

操作符模型

在操作符的 XML 模型依赖关系部分,必须指定 Solution Publisher 代码位置。

清单 6. 依赖关系部分
<libraryDependencies>
  <library>
    <cmn:description>spss-lib</cmn:description>
    <cmn:managedLibrary>
      <cmn:libPath>/opt/IBM/SPSS/ModelerSolutionPublisher/14.2</cmn:libPath>
      <cmn:includePath>/opt/IBM/SPSS/ModelerSolutionPublisher/14.2/clemrtl/inc
        lude</cmn:includePath>
    </cmn:managedLibrary>
  </library>
</libraryDependencies>

在 baskrule/MyOp/MyOp.xml 中可以看到完整的运算符模型。

运算符初始化代码

在调用任何 Solution Publisher API 之前,运算符需要先动态加载 Solution Publisher API 并初始化入口点。对于此运算符而言,需要在 _h.cgt 文件中包含以下类型的定义(参见 baskrule/MyOp/MyOp_h.cgt 中完整的已定义入口点列表)。

清单 7. _h.cgt 文件中的定义
libclemertl = dlopen("libclemrtl.so", RTLD_NOW | RTLD_DEEPBIND);
    if(!libclemertl) 
        throw SPLRuntimeOperatorException(getThisLine(), dlerror());   
/* get the routine addresses */ 
clemrtl_initialise_ext = (clemrtl_initialise_ext_t) 
    dlsym(libclemertl,"clemrtl_initialise_ext");
    if(!clemrtl_initialise_ext)
         throw SPLRuntimeOperatorException(getThisLine(), dlerror());

这段加载代码对所有调用 Solution Publisher 的运算符都是一样的,而且不用修改就可以调用不同的模型或数据。

然后在 _cpp.cgt 文件中操作符的构造函数中加载一些库。

清单 8. _cpp.cgt 文件
typedef int (*clemrtl_initialise_ext_t)(unsigned, int, void*);
...void * libclemertl;
  clemrtl_initialise_ext_t clemrtl_initialise_ext;

初始化 Solution Publisher clemrtl API 库

第一个 Solution Publisher API 是使用 clemrtl_initialise_ext() 初始化库所必需的。此初始化函数需要使用 Solution Publisher 安装目录的一个参数。因此我们创建一个运算符参数,可以在 spl 文件中设置该参数。默认位置是标准的 Solution Publisher 安装位置,该位置可以位于 baskrule/MyOp/MyOp_cpp.cgt 文件中。

清单 9. baskrule/MyOp/MyOp_cpp.cgt 文件
rstring installationDirectory = "/opt/IBM/SPSS/ModelerSolutionPublisher/14.2";
    if(hasParameter("SP_Install"))
      installationDirectory = getParameter("SP_Install");
    SPLLOG(L_INFO, "About to clemrtl initialise using SP_Install of:"<< 
        installationDirectory, "MPK");
    clemrtl_init_arg args[] = {
      {"installation_directory", installationDirectory.c_str()},
    };
    const int arg_count = sizeof args / sizeof args[0];
    if (clemrtl_initialise_ext(0,arg_count,args) != CLEMRTL_OK) {
        SPLLOG(L_ERROR, "Clemrtl initialise failed", "MPK");
    }

打开映像

使用 clemrtl_openImage() 打开一个映像并接收映像句柄。该 API 需要数据分析员提供 .pim 和 .par 文件位置。现在我们已经对 .pim 和 .par 文件名进行硬编码,并认为它们就存放在数据目录中。更常见解决方案是将它们添加成为传递给运算符的参数,添加方式和指定 Solution Publisher 安装位置的方式一样。

清单 10. 打开映像
/* open the image */
  int res, status = EXIT_FAILURE;
  image_handle = 0;
     
  res = clemrtl_openImage("baskrule.pim","baskrule.par", &image_handle);
  if (res != CLEMRTL_OK) {
		status = EXIT_FAILURE;
		SPLLOG(L_ERROR, "Open Image Failed", "MPK");
		displayError(res, 0);
	}

定义 displayError 例程以获取特定错误的额外的详细信息,并通过抛出运行时异常来终止运算符。

获取输入和输出字段信息

使用 clemrtl_getFieldCount()getFieldTypes() 获取关于输入和输出字段和类型的信息。请注意,keykeyOut 字段必须从 XML 元数据中获得数据分析员提供的值。在我们的样例中,必须使用 <inputDataSource name="file0" type="Delimited"><outputDataSource name="file3" type="Delimited">inputDataSources> 名称。

清单 11. 获取输入和输出字段信息
/* Get Input field count and types */
    char* key="file0";
  res = clemrtl_getFieldCount(image_handle, key, &fld_cnt );
  if (res != CLEMRTL_OK) {
  	status = EXIT_FAILURE;
	SPLLOG(L_ERROR, "Get Field Count Failed", "MPK");
	displayError(res, image_handle);
  }
  SPLLOG(L_INFO, "Field Count is:"<<(int)fld_cnt, "MPK");
		
  int fld_types[100]; // needs to be bigger if more than 100 fields expected
  res = clemrtl_getFieldTypes(image_handle, key, fld_cnt, fld_types );
  if (res != CLEMRTL_OK) {
  	status = EXIT_FAILURE;
  	SPLLOG(L_ERROR, "Get Field Types Failed", "MPK");
  	displayError(res, image_handle);
  }
  field_proc(fld_cnt,fld_types); 

  /* Get Output field count and types */
  size_t fld_cnt_out;
  char* keyOut="file3";
  res = clemrtl_getFieldCount(image_handle, keyOut, &fld_cnt_out );
  if (res != CLEMRTL_OK) {
  	status = EXIT_FAILURE;
  	SPLLOG(L_ERROR, "Get Output Field Type Failed", "MPK");		
  	displayError(res, image_handle);
  }
  SPLLOG(L_INFO, "Output Field Count is:"<<(int)fld_cnt_out, "MPK");
	
  int fld_types_out[100]; // needs to be bigger if more than 100 fields expected
  res = clemrtl_getFieldTypes(image_handle, keyOut, fld_cnt_out, fld_types_out );
  if (res != CLEMRTL_OK) {
  	status = EXIT_FAILURE;
  	SPLLOG(L_ERROR, "Get Output Field Types Failed", "MPK");
  	displayError(res, image_handle);
  }
  field_proc(fld_cnt_out,fld_types_out);

定义输入函数

Solution Publisher API 可定义一个函数,并在执行过程中调用它,以便提供模型中使用的数据。该函数将用来映射模型中所需的输入元组属性和输入字段。我们使用 clemrtl_setAlternativeInput() 在映像中进行注册。在注册的时候,需要传递包含指向每个输入字段的指针的输入字段结构地址。

清单 12. 定义输入函数
void** MY_OPERATOR::next_record(void* arg) {
	  SPLLOG(L_INFO, "In next_record iterator", "MPK");
    return *((buffer*) arg)->next_row++;
}

输入数据的结构和指针都在头文件 _h.cgt 中定义。

清单 13. 输入数据的结构和指针
typedef struct {
	  void** row[2];
	  void*** next_row;
	} buffer;
...buffer myBuf;

在本例中,我们使用了硬编码,行数组大小为 2,以便允许处理单个输入元组,第二行包含 NULL,表示没有后续行了。

指向输入字段的 void* 指针输入数组被分配为大小为输入字段数的 void* 指针数组(如 XML 源文件中所示),而且应该匹配之前 clemrtl_getFieldCount(image_handle, key, &fld_cnt ); 调用返回的输入字段。

清单 14. 初始化缓冲区
/* initialize the buffer */
    void* inPointers[2];
    myBuf.row[0] = (void**) &inPointers;
    myBuf.row[1] = NULL;
    myBuf.next_row = myBuf.row;

然后 setAlternativeInput 调用会传递对应于模型输入部分的键、之前获得的输入字段数和类型、函数地址和输入数据结构地址。

清单 15. setAlternativeInput 调用
res = clemrtl_setAlternativeInput(image_handle, key, fld_cnt, fld_types, 
     MY_OPERATOR::next_record, (void*) &myBuf );
if (res != CLEMRTL_OK) {
  status = EXIT_FAILURE;
  SPLLOG(L_ERROR, "Set Alternative Input Failed", "MPK");
  displayError(res, image_handle);
}

定义输出函数

Solution Publisher API 可定义一个输出迭代器函数,并在执行过程中调用它,以获取模型中生成的数据。该函数将用于映射模型生成的输出字段和输出元组属性。我们使用 clemrtl_setAlternativeOutput() 在映像中进行注册。完成注册之后,就可以在执行的时候将一个对象暗中传递给该函数。在本例中,我们将传递一个包含输出缓冲区链接列表的结构,在函数每次迭代时都会生成该列表。随后,会使用此链接列表将输出字段数据移到输出元组,然后将它发送到输出端口。

清单 16. 输出函数
void MY_OPERATOR::next_record_back(void* arg, void** row) {
	  outField* ofp;
	  ofp = (outField*) arg;
	  
	  outBuffer* obp;
	  obp = (outBuffer*) new outBuffer;
	  obp->next=0;
	  if (ofp->head == 0) { // first call
	    ofp->head = obp;
	    ofp->tail = obp;
	  } else {
	    (ofp->tail)->next = obp;
	    ofp->tail = obp; 
	  } 
	  
	    if (row[0]) {
        obp->sex = (const char *) row[0];
        obp->_missing_sex = false;
      } else {obp->_missing_sex = true;}
      if (row[1]) {
        obp->income = *((long long *) row[1]);
        obp->_missing_income = false;
      } else {obp->_missing_income = true;}
      if (row[2]) {
        obp->prediction = (const char *) row[2];
        obp->_missing_prediction = false;
      } else {obp->_missing_prediction = true;}
      if (row[3]) {
        obp->confidence = *((double *) row[3]);
        obp->_missing_confidence = false;
      } else {obp->_missing_confidence = true;}
}

模型中输出数据的结构是在头文件中定义的。

清单 17. 输出数据的结构
typedef struct {
  void* next;
  const char* sex;
  boolean _missing_sex;
  long long income;
  boolean _missing_income;
  const char* prediction;
  boolean _missing_prediction;
  double confidence;
  boolean _missing_confidence;
} outBuffer;

数据分析员在 XML 元数据中提供关于输出字段的信息的作用就是定义此结构。存储值是用于确定所返回结构中的数据类型,而所需的代码会将该值从返回的输出行移动到结构的内存中。而且数据分析员提供的 XML 元数据中会包含结构中的字段以及所返回的行的顺序的描述。

在输出迭代函数中,我们会通过测试来确保模型会生成一些值。字段返回的地址表示丢失的值。我们需要确认在此结构所定义的对应的 _missing_xxx 字段中丢失一些值。之后,会在填充输出元组时用到这些值。

setAlternativeOuput 调用会传递之前获取的模型输出部分对应的键、输出字段数和类型、函数地址和保存管理模型中产生的数据链接列表的结构指针。

清单 18. 替代的输出
/* Set the alternative output */
res = clemrtl_setAlternativeOutput(image_handle, keyOut, fld_cnt_out, fld_types_out, 
     MY_OPERATOR::next_record_back, (void*) &myOutField );
if (res != CLEMRTL_OK) {
  status = EXIT_FAILURE;
  SPLLOG(L_ERROR, "Set Alternative Output failed", "MPK");
  displayError(res, image_handle);
}

准备映像

由于该映像会在不改变参数的情况下执行多次,因为我们要使用 clemrtl_prepare(),它会在每个元组执行时减少处理过程。

清单 19. 准备映像
/* prepare the image */
res = clemrtl_prepare(image_handle);
if (res != CLEMRTL_OK) {
  status = EXIT_FAILURE;
  SPLLOG(L_ERROR, "Prepare Failed", "MPK");
  displayError(res, image_handle);
}

操作符处理方法

填充输入字段

当获得每个输入元组之后,会在运算符的处理方法中处理它。首先,在 setAlternateInput () 过程中,会在输入元组属性中填充已定义的数据结构。请注意,这就是数据分析员在 XML 元数据中提供关于输入字段的信息的用意。该存储值用于定义要填充的结构中的数据类型,而且流中的属性需要匹配模型所需的存储。在本例中,元组属性 s_sex 定义为 rstring。一旦在输入元组中找到,就使用 .c_str() 方法来获取底层的 C 引用。这会匹配第一个输入字段所需的存储,如 <field storage="string"type="discrete"><name>sex</name> 所示。

同样,元素属性 s_income 定义为 int64。一旦找到,就用属性的 getter 方法返回底层存储的引用。这会匹配第二个输入字段所需的存储,如 <field storage="integer" type="range"><name>income</name> 所示。请注意,此行中填充的字段顺序就是数据分析员在 XML 元数据中提供的顺序。

清单 20. 填充的数据
myBuf.row[0] [0] = (void*) (tuple.get_s_sex().c_str());
myBuf.row[0] [1] = (void*) &(tuple.get_s_income());

myBuf.next_row = myBuf.row; //set to point to the new data

执行映像

使用 clemrtl_execute() 执行映像时请注意,我们使用同一个 displayError 例程(它将抛出一个异常来终止运算符),因此如果执行过程中发生错误,则会终止运算符,不会再进行评分。记录下单个元组的错误并忽略,这样做可能更合适一些。或者我们可以通过通过将错误写入另外的用于处理错误的输出端口来表示错误,或者在附加的属性中表示模型执行错误或结果不正确。但是,为了简便起见,本例直接终止了运算符。

清单 21. 执行映像
/* execute the image */
  res = clemrtl_execute(image_handle);
  if (res != CLEMRTL_OK) {
    status = EXIT_FAILURE;
    SPLLOG(L_ERROR, "Execute Failed", "MPK");
    displayError(res, image_handle);
  }

模型执行成功后,myOutField 会包含返回的输出数据的链接列表头部。系统会遍历此列表,并且对于每个元素,都会使用在输出迭代函数执行过程中复制的数据来填充输出元素属性。然后会将此元组会提交到输出端口。用户可以看到,如果模型未生成输出内容(链接表是空的),则不会发送输出元组。如果模型对一个输入生成多组输出值,则会将多个元组发送到输出流。此外,这些属性定义的类型必须匹配从 XML 元数据中派生的 outBuffer 结构中的类型。在本例中,我们将属性定义为 rstring predLabelfloat64 confidence

丢失值将会导致执行清空操作后元组的输出属性位于默认值左侧。请注意,这些默认值实际上可能是模块返回值域的一部分,因此,可能需要根据模型采用不同的值或不同的处理方式。

清单 22. 处理输出数据列表
outBuffer* currentOutBuf = (outBuffer*)(myOutField.head);
    while (currentOutBuf) {
      otuple.clear(); //reset all attributes to their default values. 
      otuple.assignFrom(tp, false); // move input fields to output tuple

      /* Dig out what was returned and add to output tuple */
      if (currentOutBuf->_missing_prediction == false) 
        otuple.set_predLabel(currentOutBuf->prediction);
      if (currentOutBuf->_missing_confidence == false) 
        otuple.set_confidence(currentOutBuf->confidence);
  
      submit(otuple, 0);
   	
      /* move to next output buffer in list */    	 
      outBuffer* nextPtr = (outBuffer*)(currentOutBuf->next);
      delete currentOutBuf; // free the memory allocated in the output iterator
      currentOutBuf = nextPtr;
    } // end of while loop

操作符 prepareToShutdown 方法

关闭映像

当告知操作符需要关闭其自身时,它会使用 clemrtl_closeImage(), 来关闭映像,可以正常关闭。

清单 23. 关闭映像
/* close the image */
  res = clemrtl_closeImage(image_handle);
  if (res != CLEMRTL_OK) {
    status = EXIT_FAILURE;
    SPLLOG(L_ERROR, "Close Image Failed", "MPK");
    displayError(res, image_handle);
  }

现在已经完成了评分运算符的开发。下一步,我们将看看 Streams 应用程序开发人员如何在 Streams 应用程序中使用此运算符。


使用运算符

在 InfoSphere Streams 应用程序中使用评分模型运算符

将评分模型集成到 Streams 应用程序中通常由 Streams 应用程序开发人员来完成。在实际的 Streams 应用程序中,输入数据可能来自一个或多个连续的数据流,而运算符的结果会同时产生一个元组信息。对于本教程,我们会通过使用包含评分行的文件来进行模拟,并使用 InfoSphere Streams FileSource 运算符来读取信息并生成这些元组的流。

在实际的流应用程序中,将会由后续的流应用程序来处理输出的分数,并将它们写入外部系统或者保存到历史数据中。在这里,我们所用的模拟方法是使用 InfoSphere Streams FileSink 运算符每次写入一个元组,通过这种方式将评分元组写入一个输出文件中。

运算符所需的输入和输出内置在运算符中,这些信息需要由组件开发人员告诉应用程序开发人员。本例中,运算符需要两个输入属性:s_sex(rstring 值 T 或 F)和 s_income(int64 值),并生成两个输出属性:prediction(rstring 值 T 或 F,表示此输入是否指示购买啤酒、豆类和披萨的偏好)和 confidence(float64 值,表示预测的信心值)。

运行 SPL 应用程序样例

要求和安装

  1. 为了能构建和运行应用程序样例,需要一个运行正常的 InfoSphere Streams 环境。
  2. 需要在此环境中安装 IBM SPSS Modeler Solution Publisher Runtime 14.2 fixpack 1 和 Solution Publisher hot fix(预计于 2011 年 10 月 14 日发布)。
  3. 还要保证在部署 Streams 的所有系统上设置 LD_LIBRARY_PATH,以确保包含必要的 Solution Publisher 库。

LD_LIBRARY_PATH 要求

假设 Solution Publisher 安装在 $INSTALL_PATH 中,LD_LIBRARY_PATH 需要包含以下内容:

  • $INSTALL_PATH
  • $INSTALL_PATH/ext/bin/*
  • $INSTALL_PATH/jre/bin/classic
  • $INSTALL_PATH/jre/bin

包含在 IP 文件 dlibrarypath.sh 中的脚本是用于正确设置路径的。如果 Solution Publisher 没有安装在默认路径下,在使用脚本前,需要修改脚本第一行,使其指向 Solution Publisher 安装目录。例如,如果 Solution Publisher 安装在 /homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2 中,则将脚本第一行设置为 CLEMRUNTIME=/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2。

样例目录

ZIP 文件样例(参见 下载)包含了来自 Market Basket Analysis 样例的 .pim、.par 和 XML 文件,并加入了评分分支、输入样例和预计输出文件,还有一个完整的 Streams Programming Language 应用程序,它包含对 Market Basket Analysis 模型评分的自定义运算符。

在 baskrule/MpkSimpleSpss.spl 中提供了一个 SPL 应用程序样例,如图 6 所示。

图 6. SPL 应用程序
图片显示了包含 FileSource、MyOp 运算符和 FileSink 的 SPL 应用程序

调整并编译样例

要运行 SPL 应用程序样例,请将 baskrule.zip 文件(见 下载)解压到安装 InfoSphere Streams 和 Solution Publisher 的 Linux 系统。如果 Solution Publisher 安装路径不是默认路径 /opt/IBM/SPSS/ModelerSolutionPublisher/14.2,则需要修改 MyOp 目录中的运算符 XML 文件 (MyOp.xml)。修改 libPathincludePath 条目,使它们与 Solution Publisher 安装位置相匹配。

清单 24. libPathincludePath 条目:
<cmn:libPath>/opt/IBM/SPSS/ModelerSolutionPublisher/14.2</cmn:libPath>
<cmn:includePath>/opt/IBM/SPSS/
    ModelerSolutionPublisher/14.2/clemrtl/include</cmn:includePath>

还需要修改 MpkSimpleSpss.spl 文件,在运算符的调用中添加 SP_Install 参数。

清单 25. 添加 SP_Install 参数
stream<DataSchemaPlus> scorer = MyOp(data){
    param SP_Install:"/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2";
  }

编译样例

要将此样例编译成单独的 Streams 应用程序,请将目录更改为解压项目样例 (baskrule) 的位置,并运行 make 命令,如下所示。

清单 26. 运行 make 命令
bash-3.2$ cd STSPTest/baskrule/
bash-3.2$ make
/homes/hny1/koranda/InfoSphereStreams64/bin/sc -a -T -M Main 
Creating types...
Creating functions...
Creating operators...
Creating PEs...
Creating standalone app...
Creating application model...
Building binaries...
 [CXX-type] tuple<rstring s_sex,int64 s_income>
 [CXX-operator] data
 [CXX-operator] scorer
 [CXX-type] tuple<rstring s_sex,int64 s_income,rstring predLabel,float64 confidence>
 [CXX-operator] Writer
 [CXX-pe] pe0
 [CXX-standalone] standalone
 [LD-standalone] standalone
 [LN-standalone] standalone 
 [LD-pe] pe0 
bash-3.2$

执行样例

要执行应用程序,确保已设置 LD_LIBRARY_PATH。清单 27 显示了设置路径并回显路径以验证是否设置了正确的命令。

清单 27. 设置路径
bash-3.2$ source ldlibrarypath.sh 
bash-3.2$ echo $LD_LIBRARY_PATH
/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2:/homes/hny1/koranda/IBM
/SPSS/ModelerSolutionPublisher64P/14.2/ext/bin/pasw.adp:/homes/hny1/koranda/IBM/SPSS/
ModelerSolutionPublisher64P/14.2/ext/bin/pasw.alm:/homes/hny1/koranda/IBM/SPSS/Modeler
SolutionPublisher64P/14.2/ext/bin/pasw.bagging:/homes/hny1/koranda/IBM/SPSS/ModelerSo
lutionPublisher64P/14.2/ext/bin/pasw.boosting:/homes/hny1/koranda/IBM/SPSS/ModelerSolu
tionPublisher64P/14.2/ext/bin/pasw.cognos:/homes/hny1/koranda/IBM/SPSS/ModelerSolutio
nPublisher64P/14.2/ext/bin/pasw.common:/homes/hny1/koranda/IBM/SPSS/ModelerSolution
Publisher64P/14.2/ext/bin/pasw.me:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublish
er64P/14.2/ext/bin/pasw.netezzaindb:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPubl
isher64P/14.2/ext/bin/paswneuralnet:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPubli
sher64P/14.2/ext/bin/pasw.outerpartition:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionP
ublisher64P/14.2/ext/bin/pasw.pmmlmerge:/homes/hny1/koranda/IBM/SPSS/ModelerSolutio
nPublisher64P/14.2/ext/bin/pasw.psm:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPub
lisher64P/14.2/ext/bin/pasw.scoring:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPubli
sher64P/14.2/ext/bin/pasw.split:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher
64P/14.2/ext/bin/pasw.transformation:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPub
lisher64P/14.2/ext/bin/pasw.tree:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher
64P/14.2/ext/bin/pasw.vi:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.
2/ext/bin/pasw.xmldata:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2
/ext/bin/spss.bayesiannetwork:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher
64P/14.2/ext/bin/spss.binning:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher6
4P/14.2/ext/bin/spss.C5:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.
2/ext/bin/spss.inlinecsp:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.
2/ext/bin/spss.knn:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2/ex
t/bin/spss.modelaccreditation:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher6
4P/14.2/ext/bin/spss.modelevaluation:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPub
lisher64P/14.2/ext/bin/spss.predictoreffectiveness:/homes/hny1/koranda/IBM/SPSS/Model
erSolutionPublisher64P/14.2/ext/bin/spss.predictorstat:/homes/hny1/koranda/IBM/SPSS/M
odelerSolutionPublisher64P/14.2/ext/bin/spss.propensitymodelling:/homes/hny1/koranda/I
BM/SPSS/ModelerSolutionPublisher64P/14.2/ext/bin/spss.psmmodel:/homes/hny1/koranda
/IBM/SPSS/ModelerSolutionPublisher64P/14.2/ext/bin/spss.selflearning:/homes/hny1/kora
nda/IBM/SPSS/ModelerSolutionPublisher64P/14.2/ext/bin/spss.svm:/homes/hny1/koranda
/IBM/SPSS/ModelerSolutionPublisher64P/14.2/ext/bin/spss.xd:/homes/hny1/koranda/IBM/S
PSS/ModelerSolutionPublisher64P/14.2/jre/bin/classic:/homes/hny1/koranda/IBM/SPSS/Mo
delerSolutionPublisher64P/14.2/jre/bin
bash-3.2$

在下一步中,将目录更改为 baskrule 下的数据目录并执行独立的应用程序,如清单 28 所示。

清单 28. 执行独立的程序
bash-3.2$ cd data/
bash-3.2$ ../output/bin/standalone

您将看到很多与以下处理过程类似的信息。请注意,为了便于阅读,这里删除了时间戳、过程、类、方法和行信息。例如, 09 Aug 2011 17:11:07.181 [21715] INFO spl_pe M[PEImpl.cpp:process:483] - Start processing...

清单 29. 提示信息
bash-3.2$ ../output/bin/standalone
- Start processing...
- About to clemrtl initialise using SP_Install of: 
        "/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2"
- After clemrtl initialise
- Major=14 Minor=2 Release=0 build=0
- Image Handle Retrieved: 1
- About to get field count
- Field Count is: 2
- About to get Field Types
- Field Type 0 is: STRING
- Field Type 1 is: LONG
- About to get Output Field Count
- Output Field Count is: 4
- About to get output field types
- Field Type 0 is: STRING
- Field Type 1 is: LONG
- Field Type 2 is: STRING
- Field Type 3 is: DOUBLE
- About to set alternative Input
- After Set Alternative input
- About to set alternative output
- After Set Alternative Output
- About to prepare
- After Prepare
- Leaving Constructor
- Opening ports...
- Opened all ports...
- Creating 1 operator threads
- Created 1 operator threads
- Processing tuple from input port 0 {s_sex="F",s_income=10000}
- Joining operator threads...
- About to execute the image
 - In next_record iterator
- In next_record iterator
- After Execute
- Sending tuple to output port 0 
        {s_sex="F",s_income=10000,predLabel="F",confidence=0.988327}
- Processing tuple from input port 0 {s_sex="F",s_income=20000}
- About to execute the image
- In next_record iterator
- In next_record iterator
- After Execute
- Sending tuple to output port 0 
        {s_sex="F",s_income=20000,predLabel="F",confidence=0.989645}
...
      similar lines for other input tuples omitted
...
- Joined all operator threads...
- Joining window threads...
- Joined all window threads.
- Joining active queues...
- Joined active queues.
- Closing ports...
- Closed all ports...
- Notifying operators of termination...
- About to Close Image with handle1
- After Close Image
- Notified all operators of termination...
- Flushing operator profile metrics...
- Flushed all operator profile metrics...
- Deleting active queues...
- Deleted active queues.
- Deleting input port tuple cache...
- Deleted input port tuple cache.
- Deleting all operators...
- Deleted all operators.
- Terminating...
- Completed the standalone application processing
- Leaving MyApplication::run()
- Shutdown request received by PE...
- shutdownRequested set to true...

可以在由 FileSink 创建的 mpkoutput.csv 文件中查看结果。

清单 30. mpkoutput.csv
bash-3.2$ cat mpkoutput.csv 
"F",10000,"F",0.988326848249027
"F",20000,"F",0.989645351835357
"F",15000,"F",0.988326848249027
"F",25000,"F",0.989645351835357
"F",10000,"F",0.988326848249027
"F",20000,"F",0.989645351835357
"F",15000,"F",0.988326848249027
"F",25000,"F",0.989645351835357
"M",10000,"T",0.838323353293413
"M",20000,"F",0.990963855421687
"M",15000,"T",0.838323353293413
"M",25000,"F",0.990963855421687
"M",10000,"T",0.838323353293413
"M",20000,"F",0.990963855421687
"M",15000,"T",0.838323353293413
"M",25000,"F",0.990963855421687
bash-3.2$

成功了!

您已经了解如何编写一个从 Streams 应用程序调用 SPSS 模型的运算符。对您来说,下一步要做的是修改适配器样例,以便将它用于特定的模型和数据。


使用不同的模型

针对不同的模型进行调整

为了调整运算符样例,以便将它用于不同的输入元组/模型,您需要在以下位置进行调整:

  1. _h.cgt file — 调整输出结构
  2. _cpp.cgt 文件 ...
    1. next_record_back ...
      1. 调整输出结构
    2. 构造函数 ...
      1. 调整 .pim 和 .par 文件的名称和位置
      2. 调整输入和输出文件标记
      3. 调整输入字段指针数组大小
    3. 过程 ...
      1. 调整加载输入结构代码
      2. 调整加载输出元组代码

示例代码中已经注释掉了可能需要调整的部分。参见清单 31 中的示例。

清单 31. 可能需要调整的部分
/***************************************************************
* the following needs to be adjusted to match the pim and par  *
* file locations.Unqualified gets from data directory.       *
***************************************************************/

     res = clemrtl_openImage("baskrule.pim","baskrule.par", &image_handle);

类型匹配

让集成能够成功完成的一个重要方面是保证元组属性兼容模型字段所需的存储。

在 SPSS Modeler 的定义模型中,Solution Publisher 文档描述了从 Modeler 类型到其典型的 C 声明符之间的映射。例如模型中类型 STRING 会被解释为 UTF-8 类型的以 null 结尾的字符串,声明为 const char*,而模型中类型 LONG 会被解释为 64 位有符号整数,声明为 long long

InfoSphere Streams 文档提供了一个描述如何将 SPL 类型映射为等价的 C++ 类型的表格。例如,SPL rstring 映射为 SPL::rstring,它是 C++ std::string 基类的实现,SPL int64 映射为 SPL::int64,它是 C++ int64_t 基类的实现。

以下表格描述了所有 Modeler 类型到 Streams SPL 类型的映射。

表 1. 类型映射
Modeler 类型XML 元数据标记SP 字段类型(从 PIM 返回)C 声明(来自 SP 文档)SPL 类型
StringstringSTRINGconst char *rstring
IntegerintegerLONGlong longint64
RealrealDOUBLEdoublefloat64
TimetimeTIMElong longint64
DatedateDATElong longint64
TimestamptimestampTIMESTAMPlong longint64

结束语

结束语

本教程演示了如何完整地执行 SPSS Modeler 预测分析(符合 Solution Publisher API 限制,可以一次执行一个元组)还深入讲解了如何调整运算符样例以处理不同的模型。

请注意,还有其他通过 PMML 和 Streams Mining 工具包在 InfoSphere Streams 中执行评分模型的方法。与 Mining Toolkit 的 PMML 集成所支持的模型相比,此处介绍的直接打包技术和通过 Solution Publisher 接口集成 SPSS 模型集成的方法可以将评分扩展至更多的模型。

未来的可能性

对此进行扩展的可能性:

  • 生成泛型运算符,映射代码消除了参数和 XML 元数据,并允许整合模型,无需修改运算符的 C++ 模板代码(参阅本系列文章的 第 2 部分)。
  • 在执行失败或数据格式不正确时,提供可定制的错误行为。
  • 为动态更新模型提供更好的支持。您可以停止或重启运算符,这会导致重新加载 .pim 和 .par 文件, 但这类似于为工具包建模,有一个可选的输入端口来支持新模型,并让运算符管理模型的替换。

下载

描述名字大小
流操作符样例、程序、模型和数据baskrule.zip25KB

参考资料

学习

获得产品和技术

讨论

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Information Management
ArticleID=784265
ArticleTitle=在 InfoSphere Streams 中集成 SPSS Model Scoring,第 1 部分: 从 InfoSphere Streams 运算符调用 Solution Publisher
publish-date=01092012