从 IBM InfoSphere Streams 调用 Python 代码

编程语言是企业开发人员快速组装有效的解决方案的一种流行选择。许多公司采用 Python 为日常用途构建 IT 资产。IBM InfoSphere® Streams 是一个新颖的中间件产品,旨在直接使用 C++ 和 Java™ 技术实现逻辑。还可以在一个 Streams 应用程序的上下文内调用 Python 代码。学习如何直接从 IBM InfoSphere Streams 应用程序调用 Python 代码。

Senthil Nathan, 高级技术人员, IBM TJ Watson Research Center

Senthil NathanSenthil Nathan 是位于纽约 Yorktown Heights 的 IBM T.J. Watson Research Center 的一名高级技术人员。他在为不同技术领域构建软件方面拥有 28 年的丰富经验,这些领域包括 SOA、Web 服务、Java Enterprise Edition、PHP、Web 2.0 和 Ajax。他将全部工作精力都放在 IBM InfoSphere Streams 产品和它的商业应用上。



Glenn Hochberg, 首席技术研究员, AT&T

Glenn HochbergGlenn Hochberg 是位于新泽西州 Florham Park 的 AT&T's Chief Security Office 的一名首席技术研究员。他使用各种语言和技术来开发软件已有近 35 年的时间,包括使用汇编语言的通信协议、使用 C++ 的事务服务器、VoIP、使用 Java 和 VXML 的 IVR 应用程序,以及使用 Python 的数据分析工具。他目前是 AT&T's Internet Data Security Analysis 平台 (Flood) 的首席开发人员。



2013 年 10 月 18 日 (最初于 2013 年 4 月 05 日)

概述

IBM InfoSphere Streams 是一个高性能的实时事件处理中间件。它独特的优势在于能够从各种不同的数据来源获取结构化和非结构化数据,以用于执行实时分析。它通过将一种称为 SPL(Streams Processing Language,流处理语言)的易于使用的应用程序开发语言与一个分布式运行时平台相结合来完成此任务。这个中间件还提供了一个灵活的应用程序开发框架,将使用 C++ 和 Java 编写的代码集成到 Streams 应用程序中。除了 C++ 和 Java,许多构建真实 IT 资产的开发人员还使用动态编程语言。借助它在系统集成功能上的优势,Python 是许多公司快速构建解决方案的一种可行选择。对于现有资产是使用 Python 编写的公司,可通过一种方式将 Python 代码集成到 Streams 应用程序内。本文通过一个简单的 Streams 应用程序示例介绍实现该目的的详细信息。

本文假设您熟悉 InfoSphere Streams 和它的 SPL 编程模型。您需要理解编程技术,还需要拥有实用的 C++ 和 Python 知识。有关 InfoSphere Streams 和 Python 的深入细节,请参阅 参考资料

InfoSphere Streams 是 IBM 的大数据平台战略的一个重要组件。IBM 的许多拥有 Python 资产和技能的当前和潜在客户都可以利用它,将它与 InfoSphere Streams 结合使用。本文面向那些技术关注点为大数据应用程序的读者,包括应用程序设计人员、开发人员和架构师。


示例场景

为了解释从一个 Streams 应用程序调用 Python 所涉及的技术的真实细节,我们将坚持使用一个简单的例子。此场景涉及到从一个输入 CSV 文件读取一些 Web 地址的名称,调用用户编写的一个简单 Python 函数,该函数将返回以下细节作为结果。我们然后将每个 Web 地址的结果写入一个单独的输出 CSV 文件中:

  • URL 的主要主机名
  • URL 的备用主机名
  • URL 的 IP 地址列表
  • URL 字符串中指定的公司名称

前提条件

下面使用的代码段解释了上述场景的实现细节。您也可以 下载 此示例代码,在自己的 IBM InfoSphere Streams 安装上运行它。示例代码已在以下环境进行了测试:

  • RedHat Enterprise Linux 6.1 或更高版本(或等效的 CentOS 版本)
  • gcc 版本 4.4.5 20110214 (Red Hat 4.4.5-6) (GCC)
  • Python 2.6.6(r266:84292, Apr 11 2011, 15:50:32,随 RHEL6 一起发布)
  • /usr/lib/libpython2.6.so
  • 包含 Python.h 和其他包含文件的 /usr/include/python2.6 目录
  • 配置了一个有效的 Streams 实例的 IBM InfoSphere Streams 3.x

只要对代码或环境设置稍作调整,相同的技术也适用于稍微不同的环境(比如 RHEL 5.8 和 Streams 2.0.0.4)。


高级应用程序组件

在我们的简单示例场景中,有 3 个主要组件。由于每个组件中使用的编程语言的自然独立性,每个组件都是独立的,可放入自己的项目中:

  • UrlToIpAddress Python 脚本
  • StreamsToPythonLib C++ 脚本
  • streams-to-python SPL 脚本

UrlToIpAddress 是一个 Python 脚本,包含使用 Python API 获取给定 Web 地址的 IP 地址和主机名信息的简单逻辑。此脚本可使用 Python 解释程序独立测试。本文展示了如何从 Streams 应用程序调用 Python 脚本中的函数的过程中,这段简短的脚本发挥着重要作用。

StreamsToPythonLib 是一个 C++ 项目。在它内部,包含 SPL 原生函数的源代码。大体上讲,这里的源代码在执行 C++ 代码期间使用了 Python/C API 嵌入到 Python 代码中。Python 文档中详细介绍了如何将 Python 嵌入到 C++ 代码中。此项目包含一个包装器包含 (.h) 文件,这个文件非常重要,提供了一个 Streams SPL 应用程序调用任何 C++ 类方法的入口点。此项目中的所有 C++ 逻辑都将编译到一个共享对象库 (.so) 文件中,以供 SPL 应用程序使用。

streams-to-python 是一个 Streams SPL 项目。在它内部,我们提供了一个基本的 SPL 流向图来建立一个调用链 (SPL<-->C++<-->Python)。这段 SPL 代码从数据目录中一个输入文件读取 URL,调用 C++ 原生函数来执行 Python 代码,接收结果,然后将结果写入数据目录中的一个输出文件。在 SPL 项目目录中,一个原生函数模型 XML 文件列出了从 SPL 直接调用一个 C++ 类方法所需的元信息。此细节包括 C++ 包装器包含文件名、包含包装器函数的 C++ 命名空间、使用 SPL 语法/类型表达的 C++ 包装器函数、从 C++ 项目创建的共享对象库的名称、共享对象库的位置、包装器包含文件的位置,等等。

在以下各节中,我们将深入分析这 3 个应用程序组件,详细解释 Python、C++ 和 SPL 代码。


Python 逻辑

清单 1 给出了 Python 代码。这是我们希望从 Streams 调用的业务逻辑。

清单 1. UrlToIpAddress.py
import re, sys, socket

def getCompanyNameFromUrl(url):
    # Do a regex match to get just the company/business part in the URL.
    # Example: In "www.ibm.com", it will return "ibm".
    escapedUrl = re.escape(url)
    m = re.match(r'www\.(.*)\..{3}', url)
    x = m.group(1)
    return (x)

def getIpAddressFromUrl(url):
    # The following python API will return a triple
    # (hostname, aliaslist, ipaddrlist)
    # hostname is the primary host name for the given URL
    # aliaslist is a (possibly empty) list of alternative host names for the same URL
    # ipaddrlist is a list of IPv4 addresses for the same interface on the same host
    #
    # aliaslist and ipaddrlist may have multiple values separated by
    # comma. We will remove such comma characters in those two lists.
    # Then, return back to the caller with the three comma separated
    # fields inside a string. This can be done using the Python 
    # list comprehension.
    return(",".join([str(i).replace(",", "") for i in socket.gethostbyname_ex(url)])) 

if ((__name__ == "__main__") and (len(sys.argv) >= 2)):
    url = sys.argv[1]
    # print("url=%s" % (url, ))
    print "IP address of %s=%s" % (url, getIpAddressFromUrl(url))
    print "Company name in the URL=%s" % repr(getCompanyNameFromUrl(url))
elif ((__name__ == "__main__") and (len(sys.argv) < 2)):
    sys.exit("Usage: python UrlToIpAddress.py www.watson.ibm.com")

清单 1 中可以明显看到,为了保持调理清晰,我们特意让 Python 代码尽可能简单。这段代码中有两个 Python 函数,随后是一个将在使用 Python 解释程序执行 Python 脚本时运行的代码段。要验证该代码是否在按预期运行,可从一个 shell 窗口运行此脚本:python UrlToIpAddress.py www.watson.ibm.com

在文件顶部,导入了一些 Python 模块,比如正则表达式和套接字。第一个函数是 getCompanyNameFromUrl,它接受一个 Web 地址作为输入。它执行一次正则表达式匹配,以解析来自该 Web 地址的公司名称,并将该公司名称返回给调用方。下一个函数是 getIpAddressFromURL。它也接受一个 Web 地址作为输入。它调用一个 Python 套接字 API 来获取给定 Web 地址的 IP 地址。具体来讲,这个 Python API (gethostbyname) 返回一个包含 3 个元素的元组。这 3 个元素提供给定 Web 地址的服务器的主机名、备用主机名(如果有)和该服务器的一个或多个 IP 地址。无需将元组类型返回给调用方,此函数通过在每个元素后插入一个逗号,将元组中的 3 个元素扁平化为一个 Python 字符串,然后以一个字符串的形式将结果返回给调用方。

提供此示例是为了了解如何从一个 Streams 应用程序调用这两个 Python 脚本函数。以下各节将重点介绍这个过程。


C++ 逻辑

InfoSphere Streams 允许以两种方式提供使用 C++ 编写的代码。一种方式是使用 C++ 构建原始 Streams 运算符,然后合并使用 C++ 编写的业务逻辑。另一个选择是直接从 SPL 以原生函数的形式执行任何任意的 C++ 类方法。在此练习中,我们将使用原生函数方法。为此,我们将创建一个名为 StreamsToPythonLib 的单独的 C++ 项目,我们将在其中编写必要的代码,以便调用上一节中给出的 Python 函数。然后我们将创建一个共享对象 (.so) 库,以便将这些 C++ 代码提供给 Streams SPL 应用程序。

表 1 给出了 StreamToPythonLib C++ 项目目录的内容。

表 1. StreamsToPythonLib C++ 项目目录
文件说明
StreamsToPython.hC++ 类接口文件
StreamsToPython.cppC++ 类实现文件
StreamsToPythonWrappers.hC++ 包含文件,包含 Streams 原生函数代码
mk一段构建这个 C++ 项目的共享对象 (.so) 库的脚本
清单 2. StreamsToPython.h
#ifndef STREAMS_TO_PYTHON_H_
#define STREAMS_TO_PYTHON_H_

using namespace std;

// To avoid a redefinition compiler error, undefine the following.
#undef _POSIX_C_SOURCE
#undef _XOPEN_SOURCE
// This should be the first include file (according to Python documentation)
#include "Python.h"

// Include files that defines SPL types and functions.
#include "SPL/Runtime/Function/SPLFunctions.h"
#include <SPL/Runtime/Utility/Mutex.h>
#include <SPL/Runtime/Type/ValueHandle.h>
// Include standard C++ include files.
#include <sys/time.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <sstream>

// This will allow us to access the types and functions from SPL namespace.
using namespace SPL;

// Your #define constant definitions go here.

// Class definition follows here.
namespace calling_python_from_streams {
   class GlobalStreamsToPythonSession {
      private:
         // This member variable tells us if a global
         // streams to Python caller handle already
         // exists for a given PE/process.
         boolean streamsToPythonHandleExists;

         // Following member variables are required for
         // calling Python C APIs.
         static boolean pyInitialized;
         static boolean importFailed;
         PyObject* pFunc1;
         PyObject* pFunc2;

      public:
         GlobalStreamsToPythonSession();
         virtual ~GlobalStreamsToPythonSession();

         // This method establishes StreamsToPython handle for a given PE/process.
         int32 initializeStreamsToPython();
         // This method gets the IP address of a given URL.
         boolean getIpAddressFromUrl(rstring const & url,
            rstring & primaryHostName, rstring & alternateHostNames,
            rstring & ipAddressList, rstring & companyName);

         // Get the global (Singleton) Streams to Python session object.
         static GlobalStreamsToPythonSession & getGlobalStreamsToPythonSession();
   }; 
} 

#endif /* STREAMS_TO_PYTHON_H_ */

清单 2 表明它是一个 C++ 接口类。代码中首先包含 Python.h,这是我们调用原生 Python 代码的任务所必需的。它包含标准库头文件和 SPL 包含文件。一定要注意的是,通过包含 SPL 头文件和使用 SPL 命名空间,我们可以访问 C++ 内的 SPL 数据类型。SPL 中的许多原始和集合数据类型表示等效的 C++ 内置数据类型。在命名空间和类节中声明成员变量和成员方法。有一些与 Python 对象相关的成员变量,我们稍后会介绍它们。为类构造函数、析构函数和将从 SPL 调用的业务逻辑方法声明了一些原语。最后,有一个静态方法 getGlobalStreamsToPythonSession,它提供了从 SPL 代码对这个 C++ 类进行单例访问能力。我们稍后会看到所有过程的更多详细信息。

清单 3. StreamsToPython.cpp
#include "StreamsToPython.h"
#include <dlfcn.h>

namespace calling_python_from_streams {
   // Initialize the static member variables in this class.
   boolean GlobalStreamsToPythonSession::pyInitialized = false;
   boolean GlobalStreamsToPythonSession::importFailed = false;
...

参见 完整的清单 3 (StreamsToPython.cpp)

清单 3 是实现类。它首先是相应的接口类和动态库加载器的包含语句。Python 允许扩展和嵌入。在 Python 代码内部,可扩展它来调用 C 函数。类似地,在 C++ 代码内,可嵌入 Python 代码。清单 3 中的主要关注点是使用 Python/C API 调用原生 Python 代码。我们的实现类有 5 个 C++ 方法。以下内容深入分析了每种方法。

Constructor:在实例化这个类时,将完成以下 3 个主要任务:

  1. 将 Python 路径设置为当前目录。
  2. 初始化 Python 解释程序,因为必须在使用任何 Python/C API 函数之前完成此操作。
  3. 将 libPython 共享库动态地加载到我们的进程空间中。即使 Python 共享库将由动态加载器自动加载,仍然必须通过 dlopen 加载它,以便我们的 Python 脚本能够正确地链接到其他实现为共享对象库的 Python 模块。

Destructor:在离开这个类时将执行以下清理活动:

  1. 重置包含单例类访问所需句柄的成员变量。
  2. 清除为我们的两个 Python 函数获得的句柄。

getGlobalStreamsToPythonSession:可在 清单 2 中看到,此方法被声明为一个静态方法。这是在调用 Streams 原生函数时进入这个类的入口点。因为我们希望每个 Streams 处理元素 (PE) 仅拥有这个 C++ 类的一个实例,所以有必要维护这个 C++ 类的一个单例对象。因此,当调用这个特定的方法时,会创建这个类的一个静态对象并返回给调用方。Streams 应用程序可通过这种方式获取 C++ 对象的静态句柄,使用该静态句柄随意调用任何 C++ 类方法。

initializeStreamsToPython:因为我们为每个进程维护着这个 C++ 类的一个单例对象,所以这个类可维护看在这里多个方法调用中使用和共享的状态变量。即使这个特定的应用程序没有存储状态,这也是需要考虑的一个重要的设计方面。使用 C++ 原生函数的 Streams 应用程序可使用这样一个方法来初始化状态变量。打开一个数据库连接并存储连接句柄供后续数据库访问使用,这是此方法的一个不错的用法。本文中描述的应用程序可确保只有对这个 C++ 方法的第一次调用会初始化一个全局句柄,这个句柄表明已创建这个类的单例对象。

getIpAddressFromUrl:这是这个 C++ 类中一个非常长的方法,它包含调用一个 Python 函数和抓取返回值的必要业务逻辑。Python 框架提供了一组全面的 C API 来将 Python 代码嵌入到 C 或 C++ 应用程序中。使用 Py_Initialize 在构造函数方法中初始化了 Python 解释程序后,我们可使用这个方法中的其他 Python/C API。此方法的调用方将传递一个 Web 地址作为方法参数(比如 www.ibm.com;请注意不应包含 URL 的 http:// 部分)。此方法还接受 4 个其他的字符串引用作为参数,结果将在这些参数中返回给调用方。因为我们在这个 C++ 类中使用了 SPL 命名空间,所以我们可以访问 SPL 数据类型,比如 rstring、uint32、list 等。许多 SPL 数据类型源自于 C++ 数据类型,比如 std::string、int、vector 等。

这个 C++ 方法中的第一个任务是获取我们希望调用的两个原生 Python 函数的有效指针。第一次调用此方法时,我们希望获取这两个 Python 函数的指针,并将它们存储在成员变量 pFunc1 和 pFunc2 中。这使您能够在后续调用中重用它们。为了获取 Python 函数的指针,我们必须先导入包含这两个函数的 Python 模块。在本例中,一个 Python 模块只是 Python 脚本的文件名去掉 .py 扩展名。我们必须使用 PyString_FromString 从一个包含 Python 模块名的 C++ 字符对象获取一个 Python 字符串对象。然后,对 PyImport_Import 的一次调用将获取我们的 Python 模块的一个句柄。在任何 Python/C API 出现错误时,我们将设置一个名为 importFailed 的成员变量并从此方法返回。只有在之前成功导入 Python 模块后,对这个 C++ 方法的后续调用才会继续。这些 Python/C API 错误可使用 PyErr_Occurred 和 PyErr_Print API 检测和记录。现在是时候介绍 SPLAPPTRC 了,它是一个 SPL C++ Macro API,允许我们将应用程序调试或跟踪信息记录到 Streams 日志系统中。它接受 3 个参数:日志级别、一个包含日志消息的 C++ 字符串对象,以及可用于特定于应用程序的日志过滤的一个方面。

导入我们的 UrlToIpAddress Python 模块后,我们现在使用 PyObject_HasAttrString API,传递希望调用的 Python 函数的名称,以便检查该函数是否确实存在于 Python 模块中。验证 Python 模块中的 Python 函数的可用性之后,我们可以使用 PyObject_GetAttrString API 获取该函数的一个指针。拥有 Python 函数的有效指针后,有必要使用 PyCallable_Check API 检查一下它是否确实可以调用。成功执行这些步骤后,我们的两个 C++ 成员变量(pFunc1 和 pFunc2)将指向用户编写的有效且可调用的 Python 函数。现在,我们可以调用 PyObject_CallFunction API,传递 pFunc1 或 pFunc2 成员变量和一组想要的函数参数来执行该函数。在我们的示例中,我们将一个字符串(Web 地址)作为参数传递给 Python 函数。因此,第二个参数是 s,表明该参数是字符串格式,第三个参数是表示常规 C 字符串形式的实际 Web 地址。因为我们的两个函数都返回字符串作为结果,所以我们使用 PyString_AsString API 将返回的 Python 字符串对象转换为常规的 C 字符串。我们将来自两个 Python 函数的结果字符串存储在我们自己的 rstring 局部变量中。正如 示例场景 中所解释的,我们的第一个 Python 函数以字符串的形式返回结果,该字符串中包含 3 个逗号分隔的部分。要解析这些 CSV 字段,可以调用标准 SPL 工具包函数 csvTokenize,将返回的值直接赋给调用方所传递的 C++ 方法参数引用。从 C++ 调用 Python 函数的过程就是这样的。

在这个 C++ 实现类中,还有两个重要的地方需要强调。当使用 PyImport_Import API 导入我们的 UrlToIpAddress.py 模块时,它如何知道 Python 脚本文件的物理位置?如果引用 C++ 构造函数,那么可以调用一个标准 POSIX API,通过一个句点字符将 PYTHONPATH 环境变量设置为当前目录。这是 PyImport_Import API 能够找到 Python 脚本并导入它的关键原因。在 Streams 应用程序中,当前的工作目录始终被设置为 SPL 项目目录中包含的 /data 子目录。因此,必须将我们的 Python 脚本复制到 /data 子目录中。否则,PyImport_Import API 无法找到和导入我们的 Python 脚本。在这个 C++ 实现类中,另一个要重点注意的是对 Py_DECREF API 的自由使用。所有 Python 都有一个引用计数,统计有多少位置拥有对某个对象的引用。当引用计数变为 0 时,该对象就会被取消分配。在 Python 中,引用计数始终显式操作。因此,在我们的代码中,只要不再需要某个有效的 Python 对象,就必须调用 Py_DECREF API。

清单 4. StreamsToPythonWrappers.h
#ifndef STREAMS_TO_PYTHON_WRAPPERS_H_
#define STREAMS_TO_PYTHON_WRAPPERS_H_

// Include the file that contains the class definition.
#include "StreamsToPython.h"

namespace calling_python_from_streams {
   // Establish a handle to the StreamsToPython to be
   // accessed within a PE.
   inline int32 initializeStreamsToPython(void) {
      return GlobalStreamsToPythonSession::
         getGlobalStreamsToPythonSession().initializeStreamsToPython();
   }

   // Get the IP address of a given URL.
   inline boolean getIpAddressFromUrl(rstring const & url,
      rstring & primaryHostName, rstring & alternateHostNames,
      rstring & ipAddressList, rstring & companyName) {
      return GlobalStreamsToPythonSession::
         getGlobalStreamsToPythonSession().
         getIpAddressFromUrl(url, primaryHostName,
         alternateHostNames, ipAddressList, companyName);
   }
}

#endif /* STREAMS_TO_PYTHON_WRAPPERS_H_ */

清单 4 是 StreamsToPtyhonLib C++ 项目中一个特定于 Streams 的扩展文件。我们前面已经提到过,为了使 Streams 应用程序能够调用 C++ 类中的任何方法,我们需要执行一些额外的工作。这些额外工作在这个包装器包含文件中完成,其中包含内联函数。此文件首先包含我们在 清单 2 中看到的 C++ 类接口文件。这些包装器函数在 StreamsToPythonLib 项目中与我们的实际 C++ 类相同的命名空间范围内定义。Streams 应用程序可调用这个包装器包含文件中指定的任何内联函数。每个内联函数通过调用静态方法 getGlobalStreamsToPythonSession 获取目标 C++ 类的一个单例对象。对这个静态方法的第一次调用对该 C++ 类执行一次静态实例化。每次调用这个静态方法时,都会返回该静态对象引用。通过获取单例对象的引用,给定的内联包装器函数现在可以调用该对象中的任何 C++ 方法,将任何返回值传递给 Streams SPL 应用程序。这项技术将会为您的其他真实 Streams 项目带来方便。


SPL 逻辑

了解本示例中使用的 Python 和 C++ 组件后,是时候组装一个基本的 Streams 应用程序了。我们将编写一个简单明了的 SPL 应用程序,使一个包含 3 个 Streams 运算符的流向图适用于 SPL 标准工具包。

表 2 显示了 streams-to-python SPL 项目目录。

表 2. streams-to-python SPL 项目目录
文件/目录说明
README.txt包含整个应用程序的简短描述的文件
python.wrapper.exampleSPL 项目目录
python.wrapper.example/streams_to_python.spl一个简单 SPL 文件,调用一个 C++ 原生函数,后者进而调用了 Python 函数
python.wrapper.example/native.functionSPL 原生函数目录
python.wrapper.example/native.function/function.xml包含 SPL 原生函数模型的 XML 文件
dataSPL 应用程序的数据目录
data/UrlInput.csv包含测试 Web 地址的输入文件
data/UrlToIpAddress.py一段简单的 Python 脚本,它的函数将从 Streams 调用。将它放在这里,是因为这是一个 Streams 应用程序当前的工作目录
data/Expected-UrlToIpAddress-Result-Feb2013.csv一个 CSV 文件,包含截至 2013 年 2 月来自这个应用程序的预期结果
impl/lib在该目录中,内置于 C++ 项目内的 .so 库将被复制。Streams 原生函数模式文件被配置为从此目录加载 .so
impl/include在该目录中,来自上述 C++ 项目的包含文件将被复制。Streams 原生函数模式文件被配置为在这里查找包含文件
build-standalone.sh该脚本将构建一个独立的 Streams 可执行文件;不需要 Streams 运行时
build-distributed.sh该脚本将构建一个分布式 Streams 可执行文件;需要 Streams 运行时
run-standalone.sh该脚本将运行此应用程序的独立模式可执行文件
run-distributed.sh该脚本将运行此应用程序的分布式模式可执行文件
stop-streams-instance.sh该脚本将停止一个指定的 Streams 实例
清单 5. streams_to_python.spl
namespace python.wrapper.example;

composite streams_to_python {
   // Define input and output schema for this application.
   type
      InputSchema = tuple<rstring url>;
      OutputSchema = tuple<rstring url, rstring primaryHostName, 
         rstring alternateHostNames, rstring ipAddressList, rstring companyName>;
		
   graph
      // Read from an input file all the URLs for which we need to 
      // get the corresponding IP addresses.
      stream<InputSchema> UrlInput = FileSource() {
         param
            file: "UrlInput.csv";
            initDelay: 4.0;
      }

      // In the custom operator below, we will call python code to get the
      // primary host name, alternative host names, and IP addresses.
      stream<OutputSchema> IpAddressOfUrl = Custom(UrlInput) {
         logic
            onTuple UrlInput: {
               mutable rstring _primaryHostName = "";
               mutable rstring _alternateHostNames = "";
               mutable rstring _ipAddressList = "";
               mutable rstring _companyName = "";
               // Call the C++ native function that in turn will call Python functions.
               boolean result = getIpAddressFromUrl(UrlInput.url, _primaryHostName,
                  _alternateHostNames, _ipAddressList, _companyName);
						
               if (result == true) {
                  mutable OutputSchema _oTuple = {};
                  _oTuple.url = UrlInput.url;
                  _oTuple.primaryHostName = _primaryHostName;
                  _oTuple.alternateHostNames = _alternateHostNames;
                  _oTuple.ipAddressList = _ipAddressList;
                  _oTuple.companyName = _companyName;
					
                  submit(_oTuple, IpAddressOfUrl);
               }
            }
      }
		
      // Write the results to a file using FileSink.
      () as FileWriter1 = FileSink(IpAddressOfUrl) {
         param
            file: "UrlToIpAddress-Result.csv";
      }
}

清单 5 是 SPL 流向图,它首先定义了一个命名空间,然后定义了一个 SPL 主要 composite。在类型部分中,为这个应用程序的输入和输出定义了两个元组数据类型。然后,在一个基本的图形子句中填入了 SPL 标准工具包中提供的 3 个 Streams 运算符。第一个运算符是一个 FileSource,它从默认位置(SPL 项目的 data 子目录)的一个输入 CSV 文件读取行。FileSource 运算符发出的元组由 Custom 运算符使用,后者调用一个使用 C++ 编写的 SPL 原生函数 (getIpAddressFromUrl)。可以看出,这段 C++ 代码进而将会执行 Python 函数,返回一个给定 Web 地址的结果。这些结果值被分配给一个输出元组,并从 Custom 运算符提交。最后,一个 FileSink 运算符使用来自 Custom 运算符的输出元组,将结果写入一个输出 CSV 文件。一定要注意的是,C++ 原生函数代码被编译到一个共享对象 (.so) 库中,如下所述。


函数模型

清单 6. function.xml
<?xml version="1.0" encoding="UTF-8"?>
<functionModel xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
   xmlns="http://www.ibm.com/xmlns/prod/streams/spl/function" 
   xmlns:cmn="http://www.ibm.com/xmlns/prod/streams/spl/common"
   xsi:schemaLocation=
   "http://www.ibm.com/xmlns/prod/streams/spl/function functionModel.xsd">
  <functionSet>
    <headerFileName>StreamsToPythonWrappers.h</headerFileName>
    <cppNamespaceName>calling_python_from_streams</cppNamespaceName>
    <functions>
      <function>
        <description>Initialize the Streams to Python module</description>
        <prototype>public int32 initializeStreamsToPython()</prototype>
      </function>
      <function>
        <description>Get the IP addresses for a given URL</description>
        <prototype>public boolean getIpAddressFromUrl(rstring url, 
        mutable rstring primaryHostName, mutable rstring alternateHostNames,
        mutable rstring ipAddressList, mutable rstring companyName)</prototype>
      </function>
    </functions>
    <dependencies>
      <library>
        <cmn:description>Streams to Python Shared Library</cmn:description>
        <cmn:managedLibrary>
          <cmn:lib>StreamsToPythonLib</cmn:lib>
          <cmn:libPath>../../impl/lib</cmn:libPath>
          <cmn:includePath>../../impl/include</cmn:includePath>
          <cmn:command>../../impl/bin/archLevel</cmn:command>
        </cmn:managedLibrary>
      </library>
      <library>
        <cmn:description/>
        <cmn:managedLibrary>
          <cmn:lib>python2.6</cmn:lib>
          <cmn:libPath>/usr/lib64</cmn:libPath>
          <cmn:includePath>/usr/include/python2.6</cmn:includePath>
        </cmn:managedLibrary>
      </library>
    </dependencies>
  </functionSet>
</functionModel>

清单 6 是原生函数模型 XML 文件。在 清单 5 中的 SPL 代码中,我们看到在 Custom 运算符内调用了一个 C++ 原生函数。SPL 代码如何找到这个 C++ 代码的位置?原生函数模型 XML 文件是 XPL 代码与 C++ 代码的黏合剂。在编译 SPL 代码时,Streams 编译器通过我们在此 XML 文件中提供的信息来解析 C++ 函数名。在这个 XML 文件的开头,我们指定了包含 清单 4 中给出的内联原生函数的 C++ 包装器包含文件名称。然后我们指定了在其中定义了内联 C++ 原生函数的 C++ 命名空间。随后指定了一个 XML 代码段,我们在其中定义 C++ 原生函数的原型。一定要注意的是,原型声明是使用与 C++ 数据类型对应的 SPL 类型指定的。如果一个 C++ 原生函数想要一个作为引用传递的函数参数,那么应在函数原型中将该函数参数声明为可变参数。如果 C++ 原生函数逻辑通过一个共享对象 (.so) 文件提供,则应包含一个库 XML 代码段。我们必须在该代码段中指定库名称。(Linux 库的前 3 个字母通常为 ‘lib’,这 3 个字母应省略,而仅指定库名称。类似地,.so 扩展名也不需要。)必须指定 .so 文件的位置和共享对象库文件的包含文件的位置。一种不错的做法是,将共享对象库文件和它的包含文件捆绑为 SPL 项目目录的一部分,以便轻松地将它们发布到不同的 Streams 安装中。

如 表 2 中所示,SPL 项目目录包含用于此用途的 impl/lib 和 impl/include 子目录。在原生函数模型 XML 文件中,它表示为 ../../impl/lib 和 ../../impl/include(../../ 是 impl 目录的一个相对路径,可从函数模型 XML 文件的位置解析获得)。如果您的应用程序可用于多个 Linux® 版本及 32 位和 64 位 CPU,则有必要在不同的目录中提供不同版本的库。为了使此过程更容易自动化,本示例通过一个 shell 脚本 (../../impl/bin/archLevel) 来基于 Linux 版本和 CPU(32 位还是 64 位)自动选择正确的库位置。如果查阅 archLevel 脚本,您会理解这一个过程是如何完成的。最后,我们提供了一个库部分来指定对 libpython2.6.so 的依赖性,方法是指定它的名称、库的位置和它的包含文件。


构建示例

本文包含这里所讨论的示例的完整源代码(参见 下载)。一个给定的 Streams 应用程序可编译为两种模式(独立和分布式)。在独立模式下,整个 SPL 主要 composite被 编译为单个 Linux 可执行文件。在分布式模式下,SPL 主要 composite 被编译为可通过配置在一个或多个机器上运行的分布式组件。如果有一个满足 前提条件 的测试环境,那么您可以按照下面的说明来构建该示例:

  1. 获取 streams-to-python.zip 文件(参见 下载)。
  2. 将该文件解压到您安装 Streams 的 Linux 机器上的主目录中。
  3. 将目录更改为 ~/workspace1/StreamsToPythonLib C++ 项目目录。
  4. 您将运行 ./mk 脚本来创建 .so 共享库。
  5. 上面的命令创建 .so 文件并将它复制到 ../../impl/lib/x86_64.RHEL6 目录,将包含文件复制到 ../../impl/include 中。
  6. 将目录更改为 ~/workspace1/streams-to-python SPL 项目目录。
  7. 运行 ./build-standalone.sh 脚本创建一个独立模式应用程序。
  8. 运行 ./build-distributed.sh 脚本创建一个分布式模式应用程序。
  9. 您现在应看到包含独立和分布式可执行文件的 ~/workspace1/streams-to-python/output 目录。

运行示例

Streams 中的一个很好的特性是,我们可同时构建独立和分布式应用程序,无需更改源代码。我们现在可按照下面的说明运行这两个应用程序。

独立:这种类型的 Streams 应用程序是一个单一 Linux 可执行文件,无需启动和停止一个 Streams 运行时实例即可运行:

  1. 将目录更改为 ~/workspace1/streams-to-python SPL 项目目录。
  2. 运行 ./run-standalone.sh 脚本。
  3. 跳到下面的 “验证结果”。

分布式:这种类型的 Streams 一次性包含 SPL 流向图中指定的 Streams 运算符,这些运算符被编译为许多 PE(Processing Elements,处理元素)。这些处理元素以单个 Linux 进程的形式分布,以便使用多个 CPU 核心和一个机器集群。为了运行一个分布式模式 Streams 应用程序,需要启动一个 Streams 实例,提交应用程序作为该 Streams 实例上的一个作业,收集结果,然后停止 Streams 实例:

  1. 确保您已创建了一个 Streams 实例。
  2. 将目录更改为 ~/workspace1/streams-to-python SPL 项目目录。
  3. 使用一个命令行参数运行此脚本:./run-distributed.sh -i YOUR_STREAMS_INSTANCE_NAME
  4. 您应以参数形式将 Streams 实例名称提供给上一步中的脚本。
  5. 因为这是一个非常简单的应用程序,所以很快就会完成,只需等待 60 秒。
  6. 现在可运行以下脚本来停止该 Streams 实例: ./stop-streams-instance.sh -i YOUR_STREAMS_INSTANCE_NAME
  7. 跳到下面的 “验证结果”。

验证结果:无论运行独立还是分布式应用程序,我们的 SPL 程序逻辑都会从一个输入 CSV 文件 (data/UrlInput.csv) 中一次一行地读取 Web 地址。它调用 C++ 原生函数来获取给定 Web 地址的网络详细信息,将结果写入一个输出 CSV 文件 (data/UrlToIpAddress-Result.csv)。以下是已存储在本示例的输入 CSV 文件中的 Web 地址:

  • www.ibm.com
  • www.stanford.edu
  • www.cnn.com
  • www.ieee.org
  • www.facebook.com
  • www.yahoo.com

如果独立/分布式应用程序能够正常运行,那么您应该在 data/UrlToIpAddress-Result.csv 文件中看到结果。您的结果应类似于在撰写本文时对本示例执行的一次测试运行所得的结果 (data/Expected-UrlToIpAddress-Result-Feb2013.csv)。预期的结果如下所示。一个给定 Web 地址的结果包含 5 个具有以下格式的逗号分隔字段:WebAddress、PrimaryHostName、AlternateHostNames、IPAddresses 和 CompanyName。

"www.ibm.com","www-int.ibm.com.cs186.net","['www.ibm.com']","['129.42.58.158']","ibm"
"www.stanford.edu","www-v6.stanford.edu","['www.stanford.edu']","['171.67.215.200']","stanford"
"www.cnn.com","cnn-lax-tmp.gslb.vgtf.net","['www.cnn.com' 'www.cnn.com.vgtf.net']","['157.166.240.11' '157.166.240.13' '157.166.241.10' '157.166.241.11']","cnn"
"www.ieee.org","e1630.c.akamaiedge.net","['www.ieee.org' 'www.ieee.org.edgekey.net']","['72.247.70.198']","ieee"
"www.facebook.com","star.c10r.facebook.com","['www.facebook.com']","['66.220.158.27']","facebook"
"www.yahoo.com","ds-any-fp3-real.wa1.b.yahoo.com","['www.yahoo.com' 'fd-fp3.wg1.b.yahoo.com' 'ds-fp3.wg1.b.yahoo.com' 'ds-any-fp3-lfb.wa1.b.yahoo.com']","['98.139.183.24']","yahoo"


结束语

Python 在过去 20 年得到了长足的发展。作为一种动态编程语言,从学术界到世界知名的公司,它有众多热心的追随者。它的易用性和编程人员效率常常被视为是它在其他顶级语言(比如 C++、PHP 和 Java 编程语言)的包围中获得成功的重要原因。

IBM InfoSphere Streams 是一个市场领先的事件处理平台,提供了优秀的大数据分析功能。它通过其 SPL(Streams Processing Language,流处理语言)封装了一种强大、灵活、可扩展的编程模型,该语言支持开箱即用地集成使用 C++ 和 Java 语言编写的业务逻辑。

本文重点介绍了将两个领域(SPL 和 Python)的最佳功能结合在一起。它总结了一种在 Streams 应用程序中无缝混合使用 Python 编写的代码的方法,以便在扩展和分布式处理中利用它前所未有的功能。除了向您介绍 Streams+Python 集成之外,本文还介绍了直接从 SPL 代码中调用 C++ 类中的任意方法所涉及到的机制。

总体来讲,我们介绍了如何在 3 种语言 (SPL<-->C++<-->Python) 之间建立一个往返调用链。本文通过完全有效的示例代码(参见 下载)证明了这些概念。您可以独立 Linux 应用程序或分布式 Streams 应用程序的形式运行该示例。


下载

描述名字大小
SPL、C++ 和 Pythonstreams-to-python.zip26KB

参考资料

学习

获得产品和技术

讨论

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Information Management
ArticleID=948724
ArticleTitle=从 IBM InfoSphere Streams 调用 Python 代码
publish-date=10182013