InfoSphere Streams 2.0 特性简介,第 2 部分: 使用集合

C++ 和 SPL 集合示例

随着 IBM InfoSphere® Streams 2.0 和 Streams 处理语言 (SPL) 的引入,开发人员能够访问两种新的集合类型:集 (set) 和映射 (map)。本文介绍这 3 种集合类型和如何在原生 SPL 应用程序中利用它们。您还将学习如何使用基础 C++ Streams 运算符访问集合。

Chris Howard, 大数据解决方案架构师,首席技术官办公室, IBM

Chris HowardChris Howard 自 1998 年加入 IBM,目前是一名解决方案架构师,主要关注大数据解决方案(InfoSphere Streams 和 InfoSphere BigInsights),他是首席技术官办公室的一名成员,效力于 IBM Software Group。他以前曾在 Dublin 管理过 EMEA Stream Computing Centre,负责为 IBM 的流处理解决方案 IBM InfoSphere Streams 进行客户端解决方案开发。他还是英国计算机协会的一名学会注册院士。



2012 年 6 月 25 日

免费下载:IBM® InfoSphere Streams V2 试用版
下载更多的 IBM 软件试用版,并加入 IBM 软件下载与技术交流群组,参与在线交流。

简介

本文介绍目前可供 InfoSphere Streams 2.0 开发人员使用的 3 种集合类型。集和映射复合类型不仅可用于 Streams 2.0 版,还为基础和复合类型的嵌套增添了灵活性。本文将同时使用 Streams 处理语言和 C++ 的一些示例展示新类型的强大功能。

前提条件

本文是为拥有 Streams 编程语言技能和 C++ 技能的 Streams 组件开发人员和应用程序编程人员编写的。您可以将本文用作参考,或者可以检查并运行其中的示例来演示所介绍的技术。要执行示例,您应该对 Streams 编程有一般的了解。

运行这些示例需要一台运行 Red Hat Enterprise Linux® 且安装了 InfoSphere Streams V2.0 或更新版本的计算机。

理解集合

Streams 处理语言 (SPL) 既支持基础类型(int 和 string 等),又支持可用于在表示 SPL 应用程序中的流数据的复合类型(集和映射)。一组复合类型(称为集合)可用于表示流数据的复杂排列,支持使用内置的 SPL 函数访问和操作集合内容。

在早期的 InfoSphere Streams 版本中,SPADE 语言支持一组受限制的复合类型,包括矩阵和列表,这些类型可用于管理基础类型分组。Streams 2.0 放弃了矩阵类型。支持的集合列表现在包含:

  • 集(2.0 版中新增)
  • 映射(2.0 版中新增)
  • 列表

集合已进一步扩展,可用它来表示复合类型的复杂嵌套,包括列表的列表,这为集合所管理的数据增添了更高的灵活性。

图 1. Streams 基础和复合类型
该图显示了基础类型(布尔、枚举、数字、时间戳、字符串和 blob)和复合类型(元组和集合,包括列表、集和映射)

受限类型和无限制类型

对于字符串等基础类型,SPL 还实现了具有有限长度的集合类型。任何有限长度集合的大小需要使用一个编译时常量来定义。也可以声明包含无限类型的有限集合,比如一个有限的 rstring 列表。从开发人员的角度讲,有限和无限类型之间的区别很小。但是,在使用有限类型时存在一些编译时优化优势。

请注意,无论受限制还是无限制,SQL 都会将任何集合限制于最多 231-1 个元素。

以下各节将更详细地介绍复合类型。


理解列表集合

对于 SPADE 用户

从 Streams 1.2 升级到 Streams 2.0 的开发人员会面临着删除矩阵复合类型的情况。在许多情况下,矩阵类型用于枚举一个嵌套列表 (list<list< >>)。随着嵌套复合类型的推出,开发人员现在能够使用一组更丰富、更灵活的一组功能来表示流数据。

列表是线性排序自己的内容的顺序容器。列表允许对其内容进行随机、基于 0 的访问。简单来讲,它们可被视为动态数组,允许访问各个列表项,但它们同时还允许动态分配新条目,支持根据需要扩展或压缩列表。

列表集合类型包含如 清单 1 所示的构造函数。

清单 1. 列表集合类型构造函数
list<T>		// unbounded
list<T>[n]	// bounded

// Where T is the type of the elements, and n is the size for bounded lists

图 2 显示了一个包含 6 个 rstring 的一维列表的简单示例。

图 2. 一个水果列表
该图显示了一个一维列表,其中的水果(apple、orange、banana、pear、pineapple 和 apple)对应于元素(0、1、2、3、4 和 5)

清单 2 展示了如何声明和访问这个简单的列表。

清单 2. 声明和访问一个简单列表
...
mutable rstring myFruit;

// Immutable - must be initialized upon declaration 

list<rstring> fruit = ["apple", "orange", "banana", "pear", "pineapple", "apple"];
myFruit = fruit[1];		// orange
...

请注意,列表不需要是惟一的(请注意那两个 apple),其中的各个元素可按它们的位置索引进行访问。


理解集集合

集是无序的容器,支持基于它们的值(集元素的值也是它的键)来检索它们的内容。集存储惟一的项(没有重复项)。将一个元素添加到一个集后,它就无法修改。但是,可根据需要使用集合扩展和压缩来添加或删除列表元素。

清单 3 显示了集集合类型构造函数。

清单 3. 集集合类型构造函数
set<T>		// unbounded
set<T>[n]	// bounded

// Where T is the type of the elements, and n is the size for the bounded set

图 3 显示了一个包含 5 个 rstring 的简单集。

图 3. 一个车辆集
该图显示了包含 5 个 rstring(car、truck、airplane、boat 和 bicycle)的简单集

清单 4 展示了如何声明一个无序的集。

清单 4. 声明一个简单、无序的集
set <rstring> vehicles = {"car", "truck", "airplane", "boat", "bicycle"};

理解映射集合

映射实现为在键-值对中存储元素的关联容器。与集类似,映射集合要求所有键都是惟一的。键和值项可声明为不同的类型,完全支持嵌套的类型(甚至键也可以具有任何类型,包括其他复合类型)。

清单 5 给出了映射集合类型构造函数。

清单 5. 映射集合类型构造函数
map<K,V>		// unbounded
map<K,V>[n]		// bounded

// Where K is the key component, V is the value component, and
n: is the size for the bounded map

图 4 给出了一个同时使用了 int:rstring 和 rstring:int 的映射的例子。

图 4. 每个月中的天数的映射
该图使用一个 int32 键显示了每个月中的天数映射

清单 6 展示了如何声明天数映射。

清单 6. 使用一个 int32 键声明映射y
map <int32, rstring> monthsYear = {1 : "January", 2 : "February", 3 : "March", ...};

图 5 给出了一个月份映射的例子。

图 5. 一个月份映射
该图显示使用一个 rstring 键显示了月份映射

清单 7 展示了如何声明月份映射。

清单 7. 使用 rstring 键声明映射
map <rstring, int32> daysMonth = {"January" : 31, "February" : 28, "March" : 31, ...};

使用集合

这一节介绍如何使用集合。

访问运算符

声明一个集合中包含的成员后,Streams 会提供一些简单的访问运算符。以下是使用集合项的 3 种基本机制:

  • l[i] 用于访问一个列表中存储的项
  • in 用于检查一个集合中的成员
  • for(T x in s) ... 用于迭代一个集合中的成员。请注意,就像一个 for 循环迭代一个集合一样,该集合对 for 循环的主体是不可变的。

请注意,当使用 in 运算符访问列表或映射时,左侧的运算符是访问键。如果希望使用值而不是键,需要使用恰当的 SPL 集合函数(下面将介绍),而不是 in 运算符。

SPL 集合函数

除了访问运算符,Streams 处理语言 (SPL) 标准工具包还提供多个函数,可在 SPL 表达式中使用这些函数来操作集合。SPL 函数声明为可变或不可变(不会变)。可变的函数会尝试修改传递的输入参数(在本例中为一个集合)的内容。不可变的函数返回一个包含函数结果的新对象(一个集合)。清单 8 显示一个示例。

清单 8. SPL 函数
<list T> public T concat(T values1, T values2)
// Returns a new list of values concatenating the contents of values1 and values2

<list T> public void concatM(mutable T values1, T values2)
// Appends values2 to the end of values1

这些函数提供了丰富的功能,包括基本的元素访问、查找功能、排序和特定于集的操作。表 1 提供了这些函数的详细视图和它们在各种集合类型上的应用。

表 1. 内置的 SPL 集合函数
函数描述列表映射
添加、合并和删除appendM向列表附加一项X
clearM清除(清空)一个集合XXX
concat / concatM串联元素XX
insert / insertM插入新元素XXX
remove / removeM删除元素XXX
操作reverse / reverseM反转值列表X
makeSequence创建一个元素序列X
元素访问at访问多个元素X
slice提取元素X
selectFromList从 2 个列表中选择元素X
Finding elementsfind查找匹配的值X
has查找集合中是否存在某个给定值XXX
findFirst查找某个匹配值在一个列表中第一次出现的位置X
findFirstNotOf查找一个不匹配的值在一个列表中第一次出现的位置X
findFirstOf查找一个匹配值在一个列表中第一次出现的位置X
findLast查找一个匹配值序列在一个列表中最后一次出现的位置X
findLastNotOf查找一个不匹配的值在一个列表中最后一次出现的位置X
findLastOf查找一个匹配值在一个列表中最后一次出现的位置X
findNotOf查找一个列表中的不匹配值X
findOf查找一个列表中的匹配值X
大小size获得一个列表的大小X
countDistinct计算一个列表的项数X
对比lexicographicalCompare按字典顺序对比两个列表X
pairwiseCompare基于元素进行对比X
pairwiseMax对比并获取较长的列表X
pairwiseMin对比并获取较短的列表X
集操作setDifference计算集的差X
setIntersection计算集的交集X
setUnionsetUnionX
元素排序sort / sortM排序值X
sortIndices排序值并返回索引X
ConversiontoSet将一个列表转换为集X

使用基础运算符和集合

这一节介绍如何使用基础运算符和集合。

C++ 运算符中的 SPL 类型处理

InfoSphere Streams 使用它的 SPL 类型到 C++ 类型的映射(通常通过扩展常用 C++ 类来实现),在原生 SPL 和基础运算符之间提供了无缝的数据交换。这种交换是标准的基础类型和更复杂(和嵌套的)复合类型之间的真正交换。

SPL 集合背后的原理

表 2 给出了 SPL 和 C++ 类型之间的对应关系。请注意 C++ 基础实现,因为这对每个 SPL 类型的行为具有一定的指导性。

表 2. SPL 和 C++ 类型映射
SPL 类型C++ 类型C++ 基础实现C++ 反射类型
list<T>SPL::list<T>std::vector<T>SPL::List
set<T>SPL::set<T>std::tr1::unordered_set<T>SPL::Set
map<T,K>SPL::map<T,K>std::tr1::unordered_map<T,K>SPL::Map
list<T>[N]SPL::blist<T,N>不适用SPL::BList
set<T>[N]SPL::bset<T,N>不适用SPL::BSet
map<T,K>[N]SPL::bmap<T,K,N>不适用SPL::BMap

示例

该示例演示了无限制和有限制的复合类型在 SPL 和 C++ 基础运算符之间的传递。图 6 从总体上显示了应用程序流。

图 6. 应用程序流
该图显示了集合的 Streams 应用程序流,它展示了无限和有限的复合类型在 SPL 和 C++ 基础运算符之间的传递

清单 9 显示了这个将嵌套复合类型传递到 Streams C++ 基础运算符的简单场景。该类型声明为一个简单、二维的整数列表,对该列表计算平均值并以一个一维的浮点值列表形式返回。出于简单性,初始列表从一个文件来源填充,该文件读取一个具有正确格式的元组。

清单 9. 将一个二维列表传递给一个基础运算符的 SPL 代码
composite Main {

     // Tuple schema definitions for re-use later
     type
          // Unbounded list of list of int32       
          listSchema2d = tuple <list<list<int32> > inputMatrix>;

          // Bounded list              	
          listSchema1d = tuple <list<float64>[100] outputList>;	
		
     ...

    graph

     // Read a CSV file containing 100x100 matrix of random integers
     stream <listSchema2d> matrixData = FileSource() {
          param 
               format : csv;
               file : "matrix.txt";
     }

     // Pass the matrix data to the Primitive Operator for processing
     stream <listSchema1d> listData = myOp(matrixData) {} 

     // Sink the results of the Primitive Operator to a new file
     () as sinkListData = FileSink(listData) {
          param 
               format : csv;
               file : "list.txt";
     }
     ...
}

清单 10 描述了下一步。

清单 10. 展示传递和访问 SPL::list 的 C++ 基础运算符 (_cpp.cgt)
...
    
// Tuple processing for non-mutating ports
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port)
{

     // Define the input and output port tuples
     IPort0Type const & tp = static_cast<IPort0Type const&>(tuple);
     OPort0Type outputTuple;

     // Get a reference to the 2d list of list of int32
     SPL::list<SPL::list<SPL::int32> > const & inputMatrix = tp.get_inputMatrix();
     // Could also use IPort0Type::inputMatrix_type const& ....

     // Loop over the input matrix and calculate the average of each row
     for (uint32_t dim1=0, dim1Max = inputMatrix.size(); dim1 < dim1Max; dim1++) {

          SPL::list<SPL::int32> const& row = inputMatrix[dim1];    	     
          // Could also use IPort0Type::inputMatrix_type::value_type const& ...
        
          // Sum the data values in the row
          SPL::float64 sum = 0.0;
          for (uint32_t dim2=0, dim2Max = row.size(); dim2 < dim2Max; dim2++)
               sum += row[dim2];

          // Append the average of the row to the output tuple
          outputTuple.get_outputList().push_back (sum / row.size());
     }

     // Submit output tuple
     submit(outputTuple, 0); 
}
...

结束语

本文介绍了结合使用集合和 InfoSphere Streams,无论是在 SPL 内原生地使用集合来管理复杂的数据结构,还是传递到基础运算符。

与以往一样,了解更多信息的最佳方式是亲自试用。您可以利用这些代码段构建您自己的 SPL 代码,开始实验可供您使用的集合类型和内置函数。有关更多信息,请参阅 参考资料 获取 Streams 论坛的链接。

致谢

衷心感谢 Mark Mendell 的帮助和对本文的审校。

参考资料

学习

获得产品和技术

讨论

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Information Management
ArticleID=822614
ArticleTitle=InfoSphere Streams 2.0 特性简介,第 2 部分: 使用集合
publish-date=06252012