| 免费下载:IBM® InfoSphere Streams V2 试用版 |
|---|
| 下载更多的 IBM 软件试用版,并加入 IBM 软件下载与技术交流群组,参与在线交流。 |
当运行一个分布式实时应用程序时,所有应用程序或解决方案开发人员都会遭遇的挑战是数据格式上的差异。开发人员应该执行的第一个步骤是数据格式转换,然后再对它们进行分析/处理。在一个实时的解决方案中,数据可能来自在不同的架构上运行的不同数据源。当来自异构架构的原始二进制格式的数据到达时,所产生的结果可能是不一致或不正确的。IBM InfoSphere Streams 提供了原生函数,用它们将自定义逻辑创建为可重用的函数,可以从 Streams Processing Language (SPL) 应用程序调用它。在本文中,我们将介绍通过利用 C++ 的函数重载概念和原生函数开发数据格式转换例程的步骤。然后,我们将在 InfoSphere Streams Studio 的帮助下,介绍如何使用这些函数作为 Streams 解决方案的一部分。高级的实现架构如下所示。
图 1. 实时数据格式转换
- 业务先决条件:要求本文的读者拥有从 InfoSphere Streams 设计和运行 SPL 应用程序的基本技能,以及 C++ 编程的中级技能。
- 软件先决条件:InfoSphere Streams 2.0 及其更高版本。
为原生函数开发准备 InfoSphere Streams Studio
按照以下步骤创建新的原生函数。
- 从 Streams studio 创建一个 SPL 应用程序。
- 创建一个新的原生函数定义,单击项目并选择 new SPL Native function model(新建 SPL 原生函数模型)。
- 对于 Streams 中每一个新的运算符或原生函数,都会创建一个新的运算符模型,并需要指定所有配置。运算符模型的代码片段如图 2 所示。
- 配置运算符模型:
- 指定包含函数逻辑的头文件的名称。
- 指定函数的原型,在头文件中表示出来。
图 2. 外部库依赖性
- 需要为运算符模型中各个部分设置的值如表 1 所示。
表 1. 要设置的原生函数模型的各种属性
| 部分 | 属性 | 描述 | 值 |
|---|---|---|---|
| Function Set | Cpp Namespace Name | 提到定义 C++ 函数的名称空间名称 | DataConvertors |
| Header File Name | 提到这个定义 C++ 函数的头文件的名称。头文件需要放在 SPL 应用程序中的 resources/impl 文件夹中。 | DataConvertors.h | |
| Function Set > Prototypes(右键单击 new child and function) | Prototype > Value | 指定的函数原型。原型需要提供函数模型中的所有原生函数。 | public void endian_swap(int64 x) |
C++ 函数代码执行数据格式转换,并将它与 InfoSphere Streams 的原生函数模型集成
通过利用 C++ 的函数重载概念,可以写入一个单独的函数,以处理多种类型的数据。
- 根据类型调用适当的尾数转换函数。SPL 中的原生函数将变成在 清单 1 中指定的格式。
- include 部分是需要包括所有外部引用的地方。
- 函数的名称空间名称,该名称应与在函数模型中提供的名称相同。
- 所有原生函数的函数逻辑。
清单 1. InfoSphere Streams 中的原生函数定义的格式
#include ..
namespace <namespace_name>
{
ret_type swap_endian(parameters..)
{
...
}
}
|
以下原生函数代码片段 (清单 2) 将执行尾数转换。基于参数的类型调用适当的函数。
清单 2. 该示例代码显示 short、long 和 int 类型的输入的尾数转换逻辑
void swap_endian(unsigned short& x)
{
x = (x>>8) |
(x<<8);
}
void swap_endian(unsigned int& x)
{
x = (x>>24) |
((x<<8) & 0x00FF0000) |
((x>>8) & 0x0000FF00) |
(x<<24);
}
void swap_endian(unsigned long& x)
{
x = (x>>56) |
((x<<40) & 0x00FF000000000000) |
((x<<24) & 0x0000FF0000000000) |
((x<<8) & 0x000000FF00000000) |
((x>>8) & 0x00000000FF000000) |
((x>>24) & 0x0000000000FF0000) |
((x>>40) & 0x000000000000FF00) |
(x<<56);
}
|
在使用 SPL 的 InfoSphere Streams 中使用原生函数与 Functor 运算符
一旦开发了原生函数,下一个步骤就是使用 SPL 应用程序中的函数,在来自各种源的数据到达时执行数据格式转换。
下面是应用程序的示例 SPL 代码,它在 Functor 运算符中调用原生函数 (清单 3)。该应用程序从一个 TCP 端口读取输入,并在进一步传播输入之前,对输入执行音译。
InfoSphere Streams 提供用于读取和写入数据的各种源和接收器适配器。在一个实时 SPL 应用程序中,可以使用任意数量的源和接收器。
清单 3. 示例 SPL 应用程序使用原生函数与 Functor 运算符执行数据格式转换
composite Main {
graph
stream<int32 inp> Src = TCPSource()
{
param
role : client;
address : "inputdomain.com";
port : 23145u;
}
stream<list<float64> result> Funct = Functor(Src)
{
output
Funct:result=swap_endian(Src.inp);
}
() as TCPWrite = TCPSink(Funct)
{
param
address:"143.121.112.76";
role : server;
port : 21344u;
}
}
|
现在已经准备好启动您的实时数据格式转换作业。构建并运行作业。
- InfoSphere Streams 提供了一个调试程序来支持对实时应用程序的调试。Streams Debugger 提供了各种命令和选项,可以使用它们轻松地跟踪和验证输出。可以在 Debugging with the Streams Debugger 中找到有关 Streams Debugger 的更多信息。
我们使用 InfoSphere Streams 的原生函数解决了如何执行数据格式转换的问题。我们已经介绍了在 InfoSphere Streams 中执行和做到这一点的不同配置设置。数据格式转换是一个关键组件,可解决因为分布式环境中的架构差异而造成的不一致和不正确结果。
学习
- 阅读标题为
"IBM InfoSphere Streams
Harnessing Data in Motion" 的 IBM Redbooks 出版物。
- 了解有关 IBM
Streams Processing Language (SPL) 的更多信息。
- 在 developerWorks Information Management 专区 了解有关信息管理的更多信息。查找技术文档、how-to 文章、教育、下载、产品信息等。
- 随时关注
developerWorks 技术活动 和 网络广播。
- 关注 Twitter 上的 developerWorks。
获得产品和技术
- 利用可从 developerWorks 直接下载的 IBM 试用软件,构建您的下一个开发项目。
- 现在您可以免费使用
DB2。下载 DB2 Express-C,这是面向社区的免费版本的 DB2 Express Edition,提供与 DB2 Express Edition 相同的核心特性,并为构建和部署应用程序提供了一个坚实基础。
讨论
