使用 InfoSphere Streams 原生函数在分布式部署中完成实时数据表示转换

使用 C++ 的函数重载概念与 InfoSphere Streams 的原生函数

InfoSphere® Streams 提供了在事件发生时实时处理/报告数据或事件的能力。但有时,在多个平台上运行分布式应用程序时,数据会在具有不同处理器架构的计算机之间流动,这项任务可能非常具有挑战性。由于处理器架构的差异,数据表示可能会有所不同,这就需要应用程序开发人员在应用所需的逻辑之前转换数据表示,在数据是二进制格式时尤为如此。通过实时进行数据格式转换,从数据源读取数据后不久之后就可以获得更多一致的和有意义的结果。

Bharath Kumar Devaraju, 软件工程师, IBM

BharathKumar Devaraju 的照片Bharath Kumar Devaraju 自 2009 年加入 IBM,目前从事 IBM 云计算计划的 “数据即服务” 研发。他是一位通过 QualityStage 认证的解决方案开发人员。他拥有广泛的客户 POC 工作经验,并曾参与过成长市场方面的售前活动。



Mohan Dani, DAAS 云计算领导人, InfoSphere Streams, 企业数据质量架构师, IBM

Mohan Dani 目前领导 DAAS 云计算(InfoSphere Streams)方面的工作,他拥有超过 11 年的 IT 从业经验。他在实现大型企业解决方案方面拥有丰富的知识,曾担任过多种职位,其中包括 IBM 业务分析师、开发领导人和解决方案架构师,他擅长的领域是数据质量。他曾为 IBM 内部团队提供咨询,提供了出色的售前、售后和交付服务,他还是数据质量的思想领袖。



2012 年 12 月 07 日

免费下载:IBM® InfoSphere Streams V2 试用版
下载更多的 IBM 软件试用版,并加入 IBM 软件下载与技术交流群组,参与在线交流。

简介

当运行一个分布式实时应用程序时,所有应用程序或解决方案开发人员都会遭遇的挑战是数据格式上的差异。开发人员应该执行的第一个步骤是数据格式转换,然后再对它们进行分析/处理。在一个实时的解决方案中,数据可能来自在不同的架构上运行的不同数据源。当来自异构架构的原始二进制格式的数据到达时,所产生的结果可能是不一致或不正确的。IBM InfoSphere Streams 提供了原生函数,用它们将自定义逻辑创建为可重用的函数,可以从 Streams Processing Language (SPL) 应用程序调用它。在本文中,我们将介绍通过利用 C++ 的函数重载概念和原生函数开发数据格式转换例程的步骤。然后,我们将在 InfoSphere Streams Studio 的帮助下,介绍如何使用这些函数作为 Streams 解决方案的一部分。高级的实现架构如下所示。

图 1. 实时数据格式转换
图像显示了实时数据格式转换解决方案的高级数据流架构

先决条件

  • 业务先决条件:要求本文的读者拥有从 InfoSphere Streams 设计和运行 SPL 应用程序的基本技能,以及 C++ 编程的中级技能。
  • 软件先决条件:InfoSphere Streams 2.0 及其更高版本。

为原生函数开发准备 InfoSphere Streams Studio

SPL

SPL 代表 Streams Processing Language(流处理语言)。它是一个分布式数据流合成语言,支持快速的实时应用程序开发。SPL 旨在对代码的性能要求最苛刻的地方进行控制。具体而言,程序员可以直接控制图形拓扑和数据表示,并能够选择控制的线程模型、放置和物理布局等。

按照以下步骤创建新的原生函数。

  1. 从 Streams studio 创建一个 SPL 应用程序。
  2. 创建一个新的原生函数定义,单击项目并选择 new SPL Native function model(新建 SPL 原生函数模型)。
  3. 对于 Streams 中每一个新的运算符或原生函数,都会创建一个新的运算符模型,并需要指定所有配置。运算符模型的代码片段如图 2 所示。
  4. 配置运算符模型:
    • 指定包含函数逻辑的头文件的名称。
    • 指定函数的原型,在头文件中表示出来。
    图 2. 外部库依赖性
    图中显示了运算符模型的结构和它的各种属性
  5. 需要为运算符模型中各个部分设置的值如表 1 所示。
表 1. 要设置的原生函数模型的各种属性
部分属性描述
Function SetCpp Namespace Name提到定义 C++ 函数的名称空间名称DataConvertors
Header File Name提到这个定义 C++ 函数的头文件的名称。头文件需要放在 SPL 应用程序中的 resources/impl 文件夹中。DataConvertors.h
Function Set > Prototypes(右键单击 new child and function)Prototype > Value指定的函数原型。原型需要提供函数模型中的所有原生函数。public void endian_swap(int64 x)

C++ 函数代码执行数据格式转换,并将它与 InfoSphere Streams 的原生函数模型集成

通过利用 C++ 的函数重载概念,可以写入一个单独的函数,以处理多种类型的数据。

  • 根据类型调用适当的尾数转换函数。SPL 中的原生函数将变成在 清单 1 中指定的格式。
    • include 部分是需要包括所有外部引用的地方。
    • 函数的名称空间名称,该名称应与在函数模型中提供的名称相同。
    • 所有原生函数的函数逻辑。
清单 1. InfoSphere Streams 中的原生函数定义的格式
#include ..
                
namespace <namespace_name>
{
    ret_type swap_endian(parameters..)
    { 
    ...
    }
}

以下原生函数代码片段 (清单 2) 将执行尾数转换。基于参数的类型调用适当的函数。

清单 2. 该示例代码显示 short、long 和 int 类型的输入的尾数转换逻辑
void swap_endian(unsigned short& x)
{
    x = (x>>8) | 
    (x<<8);
}
                
void swap_endian(unsigned int& x)
{
    x = (x>>24) | 
    ((x<<8) & 0x00FF0000) |
    ((x>>8) & 0x0000FF00) |
    (x<<24);
}
                              
void swap_endian(unsigned long& x)
{
    x = (x>>56) | 
    ((x<<40) & 0x00FF000000000000) |
    ((x<<24) & 0x0000FF0000000000) |
    ((x<<8)  & 0x000000FF00000000) |
    ((x>>8)  & 0x00000000FF000000) |
    ((x>>24) & 0x0000000000FF0000) |
    ((x>>40) & 0x000000000000FF00) |
    (x<<56);
}

在使用 SPL 的 InfoSphere Streams 中使用原生函数与 Functor 运算符

一旦开发了原生函数,下一个步骤就是使用 SPL 应用程序中的函数,在来自各种源的数据到达时执行数据格式转换。

下面是应用程序的示例 SPL 代码,它在 Functor 运算符中调用原生函数 (清单 3)。该应用程序从一个 TCP 端口读取输入,并在进一步传播输入之前,对输入执行音译。

InfoSphere Streams 提供用于读取和写入数据的各种源和接收器适配器。在一个实时 SPL 应用程序中,可以使用任意数量的源和接收器。

清单 3. 示例 SPL 应用程序使用原生函数与 Functor 运算符执行数据格式转换
composite Main {
    graph
                
    stream<int32 inp> Src =  TCPSource()                                         
    {                                                                        
        param                                                                  
        role                : client;                                        
        address             : "inputdomain.com";                             
        port                : 23145u;                                         
    }                                    
    stream<list<float64> result> Funct = Functor(Src)
    {
        output
            Funct:result=swap_endian(Src.inp);
    }
                
    () as TCPWrite = TCPSink(Funct)
    {
        param
        address:"143.121.112.76";
        role : server;
        port : 21344u;
    } 
}

现在已经准备好启动您的实时数据格式转换作业。构建并运行作业。


异常条件

  • InfoSphere Streams 提供了一个调试程序来支持对实时应用程序的调试。Streams Debugger 提供了各种命令和选项,可以使用它们轻松地跟踪和验证输出。可以在 Debugging with the Streams Debugger 中找到有关 Streams Debugger 的更多信息。

结束语

我们使用 InfoSphere Streams 的原生函数解决了如何执行数据格式转换的问题。我们已经介绍了在 InfoSphere Streams 中执行和做到这一点的不同配置设置。数据格式转换是一个关键组件,可解决因为分布式环境中的架构差异而造成的不一致和不正确结果。

参考资料

学习

获得产品和技术

  • 利用可从 developerWorks 直接下载的 IBM 试用软件,构建您的下一个开发项目。
  • 现在您可以免费使用 DB2。下载 DB2 Express-C,这是面向社区的免费版本的 DB2 Express Edition,提供与 DB2 Express Edition 相同的核心特性,并为构建和部署应用程序提供了一个坚实基础。

讨论

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Information Management
ArticleID=853309
ArticleTitle=使用 InfoSphere Streams 原生函数在分布式部署中完成实时数据表示转换
publish-date=12072012