内容


将 PureData System for Analytics 与 InfoSphere Streams 相集成

使用 Streams 操作符有效地将海量数据加载到 Netezza 中

Comments

InfoSphere Streams 是一个高性能计算平台,支持持续且极其快速地分析来自多个来源的海量流数据。Netezza 设备加载这些数据集并存储它们,以供 PureData System for Analytics 分析。这个可扩展、大规模并行系统使客户端能够对海量数据执行复杂分析。

但是,Streams 2.0 标准数据库工具包所提供的默认 ODBC 操作符,不足以最大限度地发挥系统之间的高性能加载实用程序的优势。您需要使用批量加载功能 nzload,这是 Netezza 客户端提供的功能。本文介绍如何构造 C++ 原始操作符来使用该功能,以及如何从流处理语言 (Streams Processing Language, SPL) 中调用这些操作符。

准备 Netezza 和 Streams 环境

我们希望分析一下 Streams 与 Netezza 之间的互联 — 尤其是使用高性能加载实用程序从 Streams 向 Netezza 数据库加载数据。我们的测试环境包含 PureData System for Analytics N1001-010(由 Netezza 提供技术支持)和 Streams 2.0,它们安装在单个服务器上。(关于 Streams 的一般安装说明,请参阅 产品文档。)Netezza 设备的默认通信端口为 5480,我们使用这个端口建立连接。图 1 显示了我们的测试环境。

图 1. Netezza/Streams 连接
该插图左侧为 IBM PureData System for Analytics N1001-010,右侧为 IBM InfoSphere Streams 2.0,它们通过一个 5480 通信端口连接
该插图左侧为 IBM PureData System for Analytics N1001-010,右侧为 IBM InfoSphere Streams 2.0,它们通过一个 5480 通信端口连接

Netezza 准备

Netezza 的其中一个优势是它的简单性。因此您无需关注基础数据库布局,比如缓冲池和表空间设计。对于 Netezza 准备,您只需创建一个数据库:"create database <DB-Name>",然后定义一个表,来自 Streams 的数据应加载到该表中:

"create table <TABLE-NAME>
      (col1 integer,
      col2 char(20),
      col3 timestamp)"

Streams 准备

要连接 Netezza 和 Streams,并提高高性能 Netezza 加载实用程序的速度,您需要在 Streams 服务器上安装 Netezza 客户端。下载 Netezza 客户端软件。查找 Information Management > IBM Netezza NPS Software and Clients > NPS_7.0.0 > Linux

要安装 Netezza 客户端:

  • 将下载的 Netezza 客户端安装包复制到 Streams 服务器(我们使用了 V6.0.5P6 中的 Linux® 安装包)。
  • 以根用户身份登录到 Streams 服务器上,并将目录更改为您将安装包复制到的目录。
  • 解压安装包:gunzip nz-linuxclient-v6.0.5.p6.tar.gz tar -xvf nz-linuxclient-v6.0.5.p6.tar。包含以下目录:
    • * webadmin (Netezza Online Administration Client)
    • * linux64/(包含 64 位 ODBC 驱动程序)
    • * Linux(包含 Netezza 客户端和 32 位 Netezza 驱动程序)
  • 更改为用于安装 Netezza 客户端的 Linux 目录并解压该客户端(使用 ./unpack)。
  • 切换到安装路径并更改为 bin 目录(例如使用 cd /usr/local/nz/bintry),尝试启动 Netezza 客户端接口的帮助页面,比如 Netezza 终端 (nzsql) 和 Netezza 加载实用程序 (nzload):
    nzsql -h
    nzload -h

    二者都将在 Streams 中使用。

  • 如果缺少任何库,可将路径添加到 LD_LIBRARY_PATH 环境变量 (export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:{new_path})。一定要使用 32 位库,而不是 64 位。使用 echo $LD_LIBRARY_PATH 检查以确保您正确设置了路径。
  • 如果设置了 LD_LIBRARY_PATH 并验证是否使用 32 位版本后,仍没有解决缺少库的问题(例如,libssl.so.4 仍然缺少),可以查找这个库的已安装版本(例如 libssl.0.9.8)并使用缺失版本的编号创建一个符号链接:ln -s libssl.so.0.9.8 libssl.so.4 ln -s libcrypto.so.0.9.8 libcrypto.so.4
  • 现在,Netezza 终端 (nzsql) 和 Netezza 加载实用程序 (nzload) 的帮助页面应可使用以下命令:
    /usr/local/nz/bin/nzsql -h
          This is nzsql, the IBM Netezza SQL interactive terminal.
          Usage: nzsql [options] [security options] [dbname [username] [password]] . . .
          
          nzload -h
             Usage: nzload [-h|-rev] [<options>]
             Options: . . .

    Netezza 通信端口 (5480) 必须在 Netezza 服务器与 InfoSphere Streams 服务器之间打开。
  • 此外,您可将 Netezza 客户端二进制文件的目录路径添加到 PATH 环境变量中,以从全局上访问您的 shell 上的 Netezza 客户端应用程序:export PATH=$PATH:{path of Netezza client binaries}

Streams 实现细节

Streams 示例应用程序读取并分析 XML 文件源,使用从其他文件源查找的结果来充实该数据,然后将其加载到 Netezza 中。

图 2. Streams 示例应用程序数据流
该插图显示了数据流;左侧的 XML 文件和顶部的查找文件输入到 Streams 中;数据从 Streams 流经 nzload,然后到达右侧的 Netezza

Streams 应用程序在 SPL 中使用操作符实现。有许多现有的操作符,但您可以编写自己的操作符:SPL 中的复合操作符用于更高的抽象级别,C++ 中的原始操作符用于特殊功能。我们的示例应用程序中有两层:一个包含粗粒度复合操作符的顶级层,在每个复合操作符中有一个细粒度原始操作符的层。

Streams 示例应用程序复合运算符

我们的示例 Streams 应用程序包含 5 个复合操作符:

  1. XMLBlob 读取 XML 文件。
  2. parsedMeasurements 解析从 XMLBlob 收到的 XML 结构。
  3. measurementsPerDevice 验证并充实借助查找而从 parsedMeasurements 收到的解析数据。
  4. anomaliesDetector 为从 parseMeasurements 收到的已检测到的无效值编写日志。
  5. dbWriter 将从 parsedMeasurements 收到的数据写入 Netezza。

Streams/Netezza 连接(我们将重点介绍)是 dbWriter 复合操作符的一部分。图 3 展示了复合操作符之间的关系。

图 3. Streams 应用程序复合操作符
该插图显示了 5 个应用程序复合操作符
该插图显示了 5 个应用程序复合操作符

Streams/Netezza 加载操作符

dbWriter 使用了 10 个操作符:2 个 Custom、1 个 ThreadedSplit、4 个 PrepareNetezza、1 个 Beacon、1 个 Union 和 1 个 LoadNetezza。来自复合操作符 measurementsPerDevice 的已充实的数据同时加载到两个操作符中:第一个 CustomThreadedSplit

第一个 Custom 操作符仅用于日志记录。ThreadedSplit 操作符将数据流分发给 4 个并行的 PrepareNetezza 操作符,以加速准备过程。

PrepareNetezza 操作符以一种高性能方式将具有混合属性类型的数据元组转换为 rstring 类型的纯逗号分隔元组。准备好的数据再次使用 Union 操作符重新连接到一个流。一个 Beacon 操作符帮助生成一个时间触发器。第二个 Custom 操作符从 UnionBeacon 读取数据。只要从 Beacon 操作符收到一个时间触发器,它就会向来自 Union 操作符的数据流添加标点符号。

最后,LoadNetezza 操作符打开一个通往 Netezza 的管道,将数据写入该管道,只要写入了一定数量的条目或收到了一个标点符号,就将加入管道的数据部分传输到 Netezza nzload。这通过关闭管道来完成。图 4 显示了这些流。

图 4. dbWriter 复合操作符
该架构图显示了 dbWriter 复合操作符
该架构图显示了 dbWriter 复合操作符

清单 1 给出了 图 4 中所示的复合操作符的合适的 SPL 代码段。

清单 1. dbWriter SPL 代码
 . . .
(stream<t_compactedValuesNetezza> records1;
 stream<t_compactedValuesNetezza> records2;
 stream<t_compactedValuesNetezza> records3; 
 stream<t_compactedValuesNetezza> records4) = ThreadedSplit(formattedCompactedValues){
          param
          bufferSize: 3000u;
   }

   stream <rstring buf> prepare1 = PrepareNetezza(records1) {}
   stream <rstring buf> prepare2 = PrepareNetezza(records2) {}
   stream <rstring buf> prepare3 = PrepareNetezza(records3) {}
   stream <rstring buf> prepare4 = PrepareNetezza(records4) {}

   stream <rstring buf> Pipe = Union(prepare1; prepare2; prepare3; prepare4 ) {}

   stream <rstring dummy> Punct = Beacon() {
          param
             period: $NETEZZA_tmclosepipe;
   }

   stream <rstring buf> punctPipe = Custom(Pipe;Punct) {
      logic
      onTuple Punct: submit(Sys.WindowMarker, punctPipe);
      onTuple Pipe:  submit(Pipe, punctPipe);
   }

() as writeRecord = LoadNetezza(punctPipe) {
  param
      hostname  : $NETEZZA_hostname;
      database  : $NETEZZA_database;
      user      : $NETEZZA_user;
      password  : $NETEZZA_password;
      tablename : $NETEZZA_tablename;
      pipelen   : $NETEZZA_pipelen;
 }
. . .

参数 hostname、database、user、password、tablenamepipelen 的值需要包含数据库连接的适当内容。hostname、database、user、password 和 tablename 都是 rstrings,而 pipelen 是一个 int32,例如,它的值可能是 200000。

Custom、ThreadedSplit、UnionBeacon 是标准的 SPL 操作符。PrepareNetezzaLoadNetezza 是我们自己添加的(C++ 原始)操作符。

清单 2 显示了 PrepareNetezza 操作符的实现。该操作符将具有混合属性类型的输入元组转换为一个逗号分隔的 rstring 格式。因为 PrepareNetezza 操作符不是 Streams 2.0 的标准工具包集的一部分,所以它设计为一个 C++ 原始操作符。

清单 2. PrepareNetezza_cpp.cgt 操作符代码
<%SPL::CodeGen::implementationPrologue($model);%>

// Constructor
MY_OPERATOR::MY_OPERATOR(){
   // Initialize the numberCache 
   for (int i=0; i < NUM_NUMSTRINGS; i++)
     snprintf(numberCache[i], sizeof(numberCache[i]), "%d", i);
}

// Destructor
MY_OPERATOR::~MY_OPERATOR() {} 

// Notify port readiness
void MY_OPERATOR::allPortsReady() {}

// Notify pending shutdown
void MY_OPERATOR::prepareToShutdown() {}

// Processing for source and threaded operators   
void MY_OPERATOR::process(uint32_t idx) {}
 
// Tuple processing for mutating ports 
void MY_OPERATOR::process(Tuple & tuple, uint32_t port)
{
  IPort0Type const & ituple = static_cast<IPort0Type const&>(tuple);
  SPCDBG(L_DEBUG, "Processing tuple from input port 0 " << ituple, "dpsop");
  std::stringstream buf;
  for(ConstTupleIterator ti=tuple.getBeginIterator(); ti!=tuple.getEndIterator(); ++ti)
  {
      ConstTupleAttribute attribute = *ti;
      std::string name = attribute.getName();
      ConstValueHandle handle = attribute.getValue();
      std::string temp = getStringValue(handle);
      buf << temp << ",";
  }
  size_t found;
  buf << "\n";
  OPort0Type outTuple;
  outTuple.set_buf(buf.str());
  submit(outTuple, 0);
}

// Tuple processing for non-mutating ports
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {}

// Punctuation processing
void MY_OPERATOR::process(Punctuation const & punct, uint32_t port) {}

std::string MY_OPERATOR::getStringValue(ConstValueHandle & handle) {
  char buf[128];
  switch(handle.getMetaType()) {
    case SPL::Meta::Type::INT64:{
        SPL::int64 v_int64= handle;
        if (v_int64 >= NUM_NUMSTRINGS)
          snprintf(buf, sizeof(buf), "%ld", v_int64);
        else
          strcpy(buf, numberCache[v_int64]);
        break;
      }
    case SPL::Meta::Type::INT32:{
        SPL::int32 v_int32= handle;
        if (v_int32 >= NUM_NUMSTRINGS)
          snprintf(buf, sizeof(buf), "%d", v_int32);
        else
          strcpy(buf, numberCache[v_int32]);
        break;
      }
    case SPL::Meta::Type::INT16:{
        SPL::int16 v_int16= handle;
        if (v_int16 >= NUM_NUMSTRINGS)
          snprintf(buf, sizeof(buf), "%d", v_int16);
        else
          strcpy(buf, numberCache[v_int16]);
        break;
      }
    case SPL::Meta::Type::FLOAT64: {
        SPL::float64 v_float64= handle;
        snprintf(buf, sizeof(buf), "%f", v_float64);
        break;
      }
    case SPL::Meta::Type::FLOAT32: {
        SPL::float32 v_float32= handle;
        snprintf(buf, sizeof(buf), "%f", v_float32);
        break;
      }
    case SPL::Meta::Type::RSTRING: {
        SPL::rstring v_rstring= handle;
        std::string s = handle;
        strcpy(buf, s.c_str());  // todo check len
        break;
      }
    default:
     sprintf(buf, "X");
  }  
return(buf);
}
<%SPL::CodeGen::implementationEpilogue($model);%>

清单 3 显示了 LoadNetezza 操作符的详细信息,它也设计为一个 C++ 原始操作符。

清单 3. LoadNetezza_cpp.cgt 代码
#include <stdio.h>
#include <string.h>
#include <unistd.h>
using namespace std;
<%SPL::CodeGen::implementationPrologue($model);%>
<%
  my $hostname   = $model->getParameterByName("hostname");
  my $database   = $model->getParameterByName("database");
  my $user       = $model->getParameterByName("user");
  my $password   = $model->getParameterByName("password");
  my $tablename  = $model->getParameterByName("tablename");
  my $pipelen    = $model->getParameterByName("pipelen");
  if($hostname) { $hostname = $hostname->getValueAt(0)->getSPLExpression();}
  if($database) { $database = $database->getValueAt(0)->getSPLExpression();}
  if($tablename){ $tablename = $tablename->getValueAt(0)->getSPLExpression();}
  if($user)     { $user = $user->getValueAt(0)->getSPLExpression();}
  if($password) { $password = $password->getValueAt(0)->getSPLExpression();}
  if($pipelen)  { $pipelen = $pipelen->getValueAt(0)->getSPLExpression();} 
%>
// Constructor
MY_OPERATOR::MY_OPERATOR() { xx = 0; zz = 0;}
// Destructor
MY_OPERATOR::~MY_OPERATOR() {}
// Notify port readiness
void MY_OPERATOR::allPortsReady() {
    // Notifies that all ports are ready. No tuples should be submitted before
    // this. Source operators can use this method to spawn threads.
     hostname  =  <%=$hostname%>;  
     userid    =  <%=$user%>;  
     password  =  <%=$password%>;  
     database  =  <%=$database%>;  
     tablename =  <%=$tablename%>;
     pipelen   =  <%=$pipelen%>;
     createThreads(1); // Create source thread   
}
// Notify pending shutdown
void MY_OPERATOR::prepareToShutdown() {}
// Processing for source and threaded operators   
void MY_OPERATOR::process(uint32_t idx){
  SPCDBG(L_ERROR, "Processing uint tuple tuple from input port 0 " , "dpsop");
}
// Tuple processing for mutating ports 
void MY_OPERATOR::process(Tuple & tuple, uint32_t port) {}
// Tuple processing for non-mutating ports
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {  
   SPCDBG(L_DEBUG, "Processing tuple from input port 0 " << tuple, "dpsop");
   if ( xx == 0 ) { 
      SPLLOG(L_ERROR," reset xx ","dpsop");
      if (pipe(pipes)) {
         printf("error creating pipe\n");
      }     
      int rc = fork(); // Fork/exec a nzload process
      switch(rc) {
      case -1: printf("Error on fork.\n");
      case  0: // child
        // set stdin to read off the pipe. then exec nzload
        if (-1 == dup2(pipes[0], 0)) { printf("dup error\n");}
        close(pipes[1]);
        execlp("nzload","nzload", "-host", hostname.c_str(), "-u", userid.c_str(),
               "-pw", password.c_str(), "-db", database.c_str(), "-t", tablename.c_str(), 
               "-delim", ",", NULL);
        printf("exec error\n");
      }  
      child = rc;
      close(pipes[0]);
   }   
   xx = xx + 1; 
   zz++;
   IPort0Type const & ituple = static_cast<IPort0Type const&>(tuple);
   SPCDBG(L_DEBUG, "Processing tuple from input port 0 " << ituple, "dpsop");
   ConstTupleIterator ti=tuple.getBeginIterator();
   ConstTupleAttribute attribute = *ti;
   std::string buf = attribute.getValue(); 
   // Write the data values to the pipe
   write(pipes[1], buf.c_str(), buf.size()); 
   if ((0 == zz % pipelen) || (1 == zz))
      SPLLOG(L_ERROR, " ZZZ wrote record: " << zz , "dpsop"  );
   if (xx == pipelen ) {
      SPLLOG(L_ERROR, " ZZZ before waitpid wrote record: " << zz , "dpsop"  );
      close(pipes[1]);
      SPLLOG(L_ERROR, " ZZZ after waitpid wrote record: " << zz , "dpsop"  );
      xx = 0;
   }
}
// Punctuation processing
void MY_OPERATOR::process(Punctuation const & punct, uint32_t port) {
   SPCDBG(L_ERROR, "Processing punctuation  tuple from input port 0 " , "dpsop");
   if(punct==Punctuation::WindowMarker) {
      close(pipes[1]);
      waitpid(child, NULL, 0);
      SPLLOG(L_ERROR, " ZZZ wrote record: " << zz , "dpsop"  );
      xx = 0;
   } else {
     if(punct==Punctuation::FinalMarker) {
        close(pipes[1]);
        waitpid(child, NULL, 0);
        SPLLOG(L_ERROR, " ZZZ wrote record: " << zz , "dpsop"  );
      }
   }    
}
<%SPL::CodeGen::implementationEpilogue($model);%>

在这里,您可一看到 LoadNetezza 打开了一个管道,将数据写入其中,直到收到一定量的数据或一个标点符号,然后关闭管道,执行 nzload 来将加入管道的批量数据传输到 Netezza。这样就建立了您的高吞吐量连接。

结束语

Streams 和 Netezza 是处理海量数据的优秀技术。Streams 可在很短的时间内(近实时地)读取、转换和转发海量数据。Netezza 设备加载这些数据集并存储它们,以用于分析。但是,InfoSphere Streams 2.0 标准数据库工具包所提供的默认 ODBC 操作符,无法最大限度发挥两个系统之间的高性能 Netezza 加载实用程序的优势。您需要使用批量加载功能 nzload,这是由 Netezza 客户端提供的功能。本文介绍了如何构造 C++ 原始操作符来使用该功能,以及如何从 SPL 调用这些操作符。对于未来的研究工作,InfoSphere Streams 3.0 所提供的数据库工具包中包含类似的操作符。


相关主题


评论

添加或订阅评论,请先登录注册

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Information Management
ArticleID=949449
ArticleTitle=将 PureData System for Analytics 与 InfoSphere Streams 相集成
publish-date=10212013