将 PureData System for Analytics 与 InfoSphere Streams 相集成

使用 Streams 操作符有效地将海量数据加载到 Netezza 中

本文介绍如何使用 Netezza 技术,执行从 InfoSphere® Streams 2.0 到 PureData™ System for Analytics N1001-010 的批量加载。示例 InfoSphere Streams 应用程序演示了 Netezza 如何建立一个高吞吐量连接,使两个系统能一起获得它们可单独提供的高吞吐量。

Sascha Laudien, IT 专家, IBM

Sascha Laudien 的照片Sascha Laudien 是一名 IT 专家,在位于德国的 IBM Data Warehousing Center of Excellence 工作。他于 2003 年加入 IBM,之后开始在在 DB2 数据库管理、数据仓库、信息集成和分析应用程序开发方面拓展他的专业领域。在 2009 年以前,他一直都是 IBM Information Management 服务团队的一员,负责使用丰富多样的数据仓库和商业智能功能为德国的各种客户项目提供支持。在 Data Warehousing Center of Excellence,他目前正在使用 IBM Smart Analytics System 运行概念证明和基准测试。



Günter Hentschel, 软件开发工程师, IBM

Güenter HentschelGünter Hentschel 自 2007 年开始在 IBM 担任软件开发工程师。他通过参与电信提供商 Ufone Pakistan 和 Sprint USA、德国军队和德国太阳能技术提供商的项目,积累了使用 InfoSphere Streams 开发应用程序的经验。



Heike Leuschner , 顾问软件工程师, IBM

Heike LeuschnerHeike Leuschner 是 IBM 大数据解决方案的一位顾问软件工程师。她于 2007 年加入 IBM。她的专业领域是 IBM InfoSphere BigInsights 解决方案的需求分析和设计。



2013 年 10 月 21 日

InfoSphere Streams 是一个高性能计算平台,支持持续且极其快速地分析来自多个来源的海量流数据。Netezza 设备加载这些数据集并存储它们,以供 PureData System for Analytics 分析。这个可扩展、大规模并行系统使客户端能够对海量数据执行复杂分析。

但是,Streams 2.0 标准数据库工具包所提供的默认 ODBC 操作符,不足以最大限度地发挥系统之间的高性能加载实用程序的优势。您需要使用批量加载功能 nzload,这是 Netezza 客户端提供的功能。本文介绍如何构造 C++ 原始操作符来使用该功能,以及如何从流处理语言 (Streams Processing Language, SPL) 中调用这些操作符。

准备 Netezza 和 Streams 环境

我们希望分析一下 Streams 与 Netezza 之间的互联 — 尤其是使用高性能加载实用程序从 Streams 向 Netezza 数据库加载数据。我们的测试环境包含 PureData System for Analytics N1001-010(由 Netezza 提供技术支持)和 Streams 2.0,它们安装在单个服务器上。(关于 Streams 的一般安装说明,请参阅 产品文档。)Netezza 设备的默认通信端口为 5480,我们使用这个端口建立连接。图 1 显示了我们的测试环境。

图 1. Netezza/Streams 连接
该插图左侧为 IBM PureData System for Analytics N1001-010,右侧为 IBM InfoSphere Streams 2.0,它们通过一个 5480 通信端口连接

Netezza 准备

Netezza 的其中一个优势是它的简单性。因此您无需关注基础数据库布局,比如缓冲池和表空间设计。对于 Netezza 准备,您只需创建一个数据库:"create database <DB-Name>",然后定义一个表,来自 Streams 的数据应加载到该表中:

"create table <TABLE-NAME>
      (col1 integer,
      col2 char(20),
      col3 timestamp)"

Streams 准备

要连接 Netezza 和 Streams,并提高高性能 Netezza 加载实用程序的速度,您需要在 Streams 服务器上安装 Netezza 客户端。下载 Netezza 客户端软件。查找 Information Management > IBM Netezza NPS Software and Clients > NPS_7.0.0 > Linux

要安装 Netezza 客户端:

  • 将下载的 Netezza 客户端安装包复制到 Streams 服务器(我们使用了 V6.0.5P6 中的 Linux® 安装包)。
  • 以根用户身份登录到 Streams 服务器上,并将目录更改为您将安装包复制到的目录。
  • 解压安装包:gunzip nz-linuxclient-v6.0.5.p6.tar.gz tar -xvf nz-linuxclient-v6.0.5.p6.tar。包含以下目录:
    • * webadmin (Netezza Online Administration Client)
    • * linux64/(包含 64 位 ODBC 驱动程序)
    • * Linux(包含 Netezza 客户端和 32 位 Netezza 驱动程序)
  • 更改为用于安装 Netezza 客户端的 Linux 目录并解压该客户端(使用 ./unpack)。
  • 切换到安装路径并更改为 bin 目录(例如使用 cd /usr/local/nz/bintry),尝试启动 Netezza 客户端接口的帮助页面,比如 Netezza 终端 (nzsql) 和 Netezza 加载实用程序 (nzload):
    nzsql -h
    nzload -h

    二者都将在 Streams 中使用。

  • 如果缺少任何库,可将路径添加到 LD_LIBRARY_PATH 环境变量 (export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:{new_path})。一定要使用 32 位库,而不是 64 位。使用 echo $LD_LIBRARY_PATH 检查以确保您正确设置了路径。
  • 如果设置了 LD_LIBRARY_PATH 并验证是否使用 32 位版本后,仍没有解决缺少库的问题(例如,libssl.so.4 仍然缺少),可以查找这个库的已安装版本(例如 libssl.0.9.8)并使用缺失版本的编号创建一个符号链接:ln -s libssl.so.0.9.8 libssl.so.4 ln -s libcrypto.so.0.9.8 libcrypto.so.4
  • 现在,Netezza 终端 (nzsql) 和 Netezza 加载实用程序 (nzload) 的帮助页面应可使用以下命令:
    /usr/local/nz/bin/nzsql -h
          This is nzsql, the IBM Netezza SQL interactive terminal.
          Usage: nzsql [options] [security options] [dbname [username] [password]] . . .
          
          nzload -h
             Usage: nzload [-h|-rev] [<options>]
             Options: . . .

    Netezza 通信端口 (5480) 必须在 Netezza 服务器与 InfoSphere Streams 服务器之间打开。
  • 此外,您可将 Netezza 客户端二进制文件的目录路径添加到 PATH 环境变量中,以从全局上访问您的 shell 上的 Netezza 客户端应用程序:export PATH=$PATH:{path of Netezza client binaries}

Streams 实现细节

Streams 示例应用程序读取并分析 XML 文件源,使用从其他文件源查找的结果来充实该数据,然后将其加载到 Netezza 中。

图 2. Streams 示例应用程序数据流
该插图显示了数据流;左侧的 XML 文件和顶部的查找文件输入到 Streams 中;数据从 Streams 流经 nzload,然后到达右侧的 Netezza

Streams 应用程序在 SPL 中使用操作符实现。有许多现有的操作符,但您可以编写自己的操作符:SPL 中的复合操作符用于更高的抽象级别,C++ 中的原始操作符用于特殊功能。我们的示例应用程序中有两层:一个包含粗粒度复合操作符的顶级层,在每个复合操作符中有一个细粒度原始操作符的层。


Streams 示例应用程序复合运算符

我们的示例 Streams 应用程序包含 5 个复合操作符:

  1. XMLBlob 读取 XML 文件。
  2. parsedMeasurements 解析从 XMLBlob 收到的 XML 结构。
  3. measurementsPerDevice 验证并充实借助查找而从 parsedMeasurements 收到的解析数据。
  4. anomaliesDetector 为从 parseMeasurements 收到的已检测到的无效值编写日志。
  5. dbWriter 将从 parsedMeasurements 收到的数据写入 Netezza。

Streams/Netezza 连接(我们将重点介绍)是 dbWriter 复合操作符的一部分。图 3 展示了复合操作符之间的关系。

图 3. Streams 应用程序复合操作符
该插图显示了 5 个应用程序复合操作符

Streams/Netezza 加载操作符

dbWriter 使用了 10 个操作符:2 个 Custom、1 个 ThreadedSplit、4 个 PrepareNetezza、1 个 Beacon、1 个 Union 和 1 个 LoadNetezza。来自复合操作符 measurementsPerDevice 的已充实的数据同时加载到两个操作符中:第一个 CustomThreadedSplit

第一个 Custom 操作符仅用于日志记录。ThreadedSplit 操作符将数据流分发给 4 个并行的 PrepareNetezza 操作符,以加速准备过程。

PrepareNetezza 操作符以一种高性能方式将具有混合属性类型的数据元组转换为 rstring 类型的纯逗号分隔元组。准备好的数据再次使用 Union 操作符重新连接到一个流。一个 Beacon 操作符帮助生成一个时间触发器。第二个 Custom 操作符从 UnionBeacon 读取数据。只要从 Beacon 操作符收到一个时间触发器,它就会向来自 Union 操作符的数据流添加标点符号。

最后,LoadNetezza 操作符打开一个通往 Netezza 的管道,将数据写入该管道,只要写入了一定数量的条目或收到了一个标点符号,就将加入管道的数据部分传输到 Netezza nzload。这通过关闭管道来完成。图 4 显示了这些流。

图 4. dbWriter 复合操作符
该架构图显示了 dbWriter 复合操作符

清单 1 给出了 图 4 中所示的复合操作符的合适的 SPL 代码段。

清单 1. dbWriter SPL 代码
 . . .
(stream<t_compactedValuesNetezza> records1;
 stream<t_compactedValuesNetezza> records2;
 stream<t_compactedValuesNetezza> records3; 
 stream<t_compactedValuesNetezza> records4) = ThreadedSplit(formattedCompactedValues){
          param
          bufferSize: 3000u;
   }

   stream <rstring buf> prepare1 = PrepareNetezza(records1) {}
   stream <rstring buf> prepare2 = PrepareNetezza(records2) {}
   stream <rstring buf> prepare3 = PrepareNetezza(records3) {}
   stream <rstring buf> prepare4 = PrepareNetezza(records4) {}

   stream <rstring buf> Pipe = Union(prepare1; prepare2; prepare3; prepare4 ) {}

   stream <rstring dummy> Punct = Beacon() {
          param
             period: $NETEZZA_tmclosepipe;
   }

   stream <rstring buf> punctPipe = Custom(Pipe;Punct) {
      logic
      onTuple Punct: submit(Sys.WindowMarker, punctPipe);
      onTuple Pipe:  submit(Pipe, punctPipe);
   }

() as writeRecord = LoadNetezza(punctPipe) {
  param
      hostname  : $NETEZZA_hostname;
      database  : $NETEZZA_database;
      user      : $NETEZZA_user;
      password  : $NETEZZA_password;
      tablename : $NETEZZA_tablename;
      pipelen   : $NETEZZA_pipelen;
 }
. . .

参数 hostname、database、user、password、tablenamepipelen 的值需要包含数据库连接的适当内容。hostname、database、user、password 和 tablename 都是 rstrings,而 pipelen 是一个 int32,例如,它的值可能是 200000。

原始的 C++ 操作符

您可在 developerWorks/Community/files 中的 Streams Operators (SPL) > Netezza.tar 下找到原始的 C++ 操作符。对于清单 3 中的示例,LoadNetezza_cpp.cgt 通过标点符号得到了增强,有一个时间触发器和一个记录计数触发器,这在输入流不连续时很有用。在该情况下,增强的 LoadNetezza 可确保数据不会在管道中停留超过一定时间,因为该管道同时会知悉时间(通过标点符号提供)和记录量。

Custom、ThreadedSplit、UnionBeacon 是标准的 SPL 操作符。PrepareNetezzaLoadNetezza 是我们自己添加的(C++ 原始)操作符。

清单 2 显示了 PrepareNetezza 操作符的实现。该操作符将具有混合属性类型的输入元组转换为一个逗号分隔的 rstring 格式。因为 PrepareNetezza 操作符不是 Streams 2.0 的标准工具包集的一部分,所以它设计为一个 C++ 原始操作符。

清单 2. PrepareNetezza_cpp.cgt 操作符代码
<%SPL::CodeGen::implementationPrologue($model);%>

// Constructor
MY_OPERATOR::MY_OPERATOR(){
   // Initialize the numberCache 
   for (int i=0; i < NUM_NUMSTRINGS; i++)
     snprintf(numberCache[i], sizeof(numberCache[i]), "%d", i);
}

// Destructor
MY_OPERATOR::~MY_OPERATOR() {} 

// Notify port readiness
void MY_OPERATOR::allPortsReady() {}

// Notify pending shutdown
void MY_OPERATOR::prepareToShutdown() {}

// Processing for source and threaded operators   
void MY_OPERATOR::process(uint32_t idx) {}
 
// Tuple processing for mutating ports 
void MY_OPERATOR::process(Tuple & tuple, uint32_t port)
{
  IPort0Type const & ituple = static_cast<IPort0Type const&>(tuple);
  SPCDBG(L_DEBUG, "Processing tuple from input port 0 " << ituple, "dpsop");
  std::stringstream buf;
  for(ConstTupleIterator ti=tuple.getBeginIterator(); ti!=tuple.getEndIterator(); ++ti)
  {
      ConstTupleAttribute attribute = *ti;
      std::string name = attribute.getName();
      ConstValueHandle handle = attribute.getValue();
      std::string temp = getStringValue(handle);
      buf << temp << ",";
  }
  size_t found;
  buf << "\n";
  OPort0Type outTuple;
  outTuple.set_buf(buf.str());
  submit(outTuple, 0);
}

// Tuple processing for non-mutating ports
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {}

// Punctuation processing
void MY_OPERATOR::process(Punctuation const & punct, uint32_t port) {}

std::string MY_OPERATOR::getStringValue(ConstValueHandle & handle) {
  char buf[128];
  switch(handle.getMetaType()) {
    case SPL::Meta::Type::INT64:{
        SPL::int64 v_int64= handle;
        if (v_int64 >= NUM_NUMSTRINGS)
          snprintf(buf, sizeof(buf), "%ld", v_int64);
        else
          strcpy(buf, numberCache[v_int64]);
        break;
      }
    case SPL::Meta::Type::INT32:{
        SPL::int32 v_int32= handle;
        if (v_int32 >= NUM_NUMSTRINGS)
          snprintf(buf, sizeof(buf), "%d", v_int32);
        else
          strcpy(buf, numberCache[v_int32]);
        break;
      }
    case SPL::Meta::Type::INT16:{
        SPL::int16 v_int16= handle;
        if (v_int16 >= NUM_NUMSTRINGS)
          snprintf(buf, sizeof(buf), "%d", v_int16);
        else
          strcpy(buf, numberCache[v_int16]);
        break;
      }
    case SPL::Meta::Type::FLOAT64: {
        SPL::float64 v_float64= handle;
        snprintf(buf, sizeof(buf), "%f", v_float64);
        break;
      }
    case SPL::Meta::Type::FLOAT32: {
        SPL::float32 v_float32= handle;
        snprintf(buf, sizeof(buf), "%f", v_float32);
        break;
      }
    case SPL::Meta::Type::RSTRING: {
        SPL::rstring v_rstring= handle;
        std::string s = handle;
        strcpy(buf, s.c_str());  // todo check len
        break;
      }
    default:
     sprintf(buf, "X");
  }  
return(buf);
}
<%SPL::CodeGen::implementationEpilogue($model);%>

清单 3 显示了 LoadNetezza 操作符的详细信息,它也设计为一个 C++ 原始操作符。

清单 3. LoadNetezza_cpp.cgt 代码
#include <stdio.h>
#include <string.h>
#include <unistd.h>
using namespace std;
<%SPL::CodeGen::implementationPrologue($model);%>
<%
  my $hostname   = $model->getParameterByName("hostname");
  my $database   = $model->getParameterByName("database");
  my $user       = $model->getParameterByName("user");
  my $password   = $model->getParameterByName("password");
  my $tablename  = $model->getParameterByName("tablename");
  my $pipelen    = $model->getParameterByName("pipelen");
  if($hostname) { $hostname = $hostname->getValueAt(0)->getSPLExpression();}
  if($database) { $database = $database->getValueAt(0)->getSPLExpression();}
  if($tablename){ $tablename = $tablename->getValueAt(0)->getSPLExpression();}
  if($user)     { $user = $user->getValueAt(0)->getSPLExpression();}
  if($password) { $password = $password->getValueAt(0)->getSPLExpression();}
  if($pipelen)  { $pipelen = $pipelen->getValueAt(0)->getSPLExpression();} 
%>
// Constructor
MY_OPERATOR::MY_OPERATOR() { xx = 0; zz = 0;}
// Destructor
MY_OPERATOR::~MY_OPERATOR() {}
// Notify port readiness
void MY_OPERATOR::allPortsReady() {
    // Notifies that all ports are ready. No tuples should be submitted before
    // this. Source operators can use this method to spawn threads.
     hostname  =  <%=$hostname%>;  
     userid    =  <%=$user%>;  
     password  =  <%=$password%>;  
     database  =  <%=$database%>;  
     tablename =  <%=$tablename%>;
     pipelen   =  <%=$pipelen%>;
     createThreads(1); // Create source thread   
}
// Notify pending shutdown
void MY_OPERATOR::prepareToShutdown() {}
// Processing for source and threaded operators   
void MY_OPERATOR::process(uint32_t idx){
  SPCDBG(L_ERROR, "Processing uint tuple tuple from input port 0 " , "dpsop");
}
// Tuple processing for mutating ports 
void MY_OPERATOR::process(Tuple & tuple, uint32_t port) {}
// Tuple processing for non-mutating ports
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {  
   SPCDBG(L_DEBUG, "Processing tuple from input port 0 " << tuple, "dpsop");
   if ( xx == 0 ) { 
      SPLLOG(L_ERROR," reset xx ","dpsop");
      if (pipe(pipes)) {
         printf("error creating pipe\n");
      }     
      int rc = fork(); // Fork/exec a nzload process
      switch(rc) {
      case -1: printf("Error on fork.\n");
      case  0: // child
        // set stdin to read off the pipe. then exec nzload
        if (-1 == dup2(pipes[0], 0)) { printf("dup error\n");}
        close(pipes[1]);
        execlp("nzload","nzload", "-host", hostname.c_str(), "-u", userid.c_str(),
               "-pw", password.c_str(), "-db", database.c_str(), "-t", tablename.c_str(), 
               "-delim", ",", NULL);
        printf("exec error\n");
      }  
      child = rc;
      close(pipes[0]);
   }   
   xx = xx + 1; 
   zz++;
   IPort0Type const & ituple = static_cast<IPort0Type const&>(tuple);
   SPCDBG(L_DEBUG, "Processing tuple from input port 0 " << ituple, "dpsop");
   ConstTupleIterator ti=tuple.getBeginIterator();
   ConstTupleAttribute attribute = *ti;
   std::string buf = attribute.getValue(); 
   // Write the data values to the pipe
   write(pipes[1], buf.c_str(), buf.size()); 
   if ((0 == zz % pipelen) || (1 == zz))
      SPLLOG(L_ERROR, " ZZZ wrote record: " << zz , "dpsop"  );
   if (xx == pipelen ) {
      SPLLOG(L_ERROR, " ZZZ before waitpid wrote record: " << zz , "dpsop"  );
      close(pipes[1]);
      SPLLOG(L_ERROR, " ZZZ after waitpid wrote record: " << zz , "dpsop"  );
      xx = 0;
   }
}
// Punctuation processing
void MY_OPERATOR::process(Punctuation const & punct, uint32_t port) {
   SPCDBG(L_ERROR, "Processing punctuation  tuple from input port 0 " , "dpsop");
   if(punct==Punctuation::WindowMarker) {
      close(pipes[1]);
      waitpid(child, NULL, 0);
      SPLLOG(L_ERROR, " ZZZ wrote record: " << zz , "dpsop"  );
      xx = 0;
   } else {
     if(punct==Punctuation::FinalMarker) {
        close(pipes[1]);
        waitpid(child, NULL, 0);
        SPLLOG(L_ERROR, " ZZZ wrote record: " << zz , "dpsop"  );
      }
   }    
}
<%SPL::CodeGen::implementationEpilogue($model);%>

在这里,您可一看到 LoadNetezza 打开了一个管道,将数据写入其中,直到收到一定量的数据或一个标点符号,然后关闭管道,执行 nzload 来将加入管道的批量数据传输到 Netezza。这样就建立了您的高吞吐量连接。


结束语

Streams 和 Netezza 是处理海量数据的优秀技术。Streams 可在很短的时间内(近实时地)读取、转换和转发海量数据。Netezza 设备加载这些数据集并存储它们,以用于分析。但是,InfoSphere Streams 2.0 标准数据库工具包所提供的默认 ODBC 操作符,无法最大限度发挥两个系统之间的高性能 Netezza 加载实用程序的优势。您需要使用批量加载功能 nzload,这是由 Netezza 客户端提供的功能。本文介绍了如何构造 C++ 原始操作符来使用该功能,以及如何从 SPL 调用这些操作符。对于未来的研究工作,InfoSphere Streams 3.0 所提供的数据库工具包中包含类似的操作符。

参考资料

学习

获得产品和技术

  • 获取 InfoSphere Streams 的试用版,构建应用程序来快速摄取、分析和关联来自数千个实时来源的信息。
  • 获取 IBM InfoSphere BigInsights 的试用版,管理和分析海量静止的结构化和非结构化数据。
  • 使用 IBM 试用软件 构建您的下一个开发项目,这些软件可直接从 developerWorks 下载获得。

讨论

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Information Management
ArticleID=949449
ArticleTitle=将 PureData System for Analytics 与 InfoSphere Streams 相集成
publish-date=10212013