Integrating PureData System for Analytics with InfoSphere Streams

Using Streams operators for effective mass data load to Netezza

This article describes how to perform bulk load from InfoSphere® Streams 2.0 to PureData™ System for Analytics N1001-010 using Netezza technology. The example InfoSphere Streams application demonstrates how Netezza enables a high-throughput connection and allows both systems working together to reach the high throughput that they can offer separately.

Sascha Laudien (SaschaLaudien@de.ibm.com), IT Specialist, IBM

Sascha Laudien photoSascha Laudien is an IT specialist for the IBM Data Warehousing Center of Excellence in Germany. He joined IBM in 2003 where he started to expand his areas of expertise in admiistration of DB2 databases, data warehousing, information integration, and analytical application development. Until 2009 he was part of the IBM Information Management services team, where he supported various customer projects in Germany using a wide variety of data warehousing and business intelligence capabilities. In the Data Warehousing Center of Excellence he is currently running proofs of concept and benchmarks with IBM Smart Analytics System.



Günter Hentschel (guenter.hentschel@de.ibm.com), Software Development Engineer, IBM

Güenter Hentschel Günter Hentschel has worked as a software development engineer at IBM since 2007. He gained experience in application development with InfoSphere Streams working on projects for telecommunication providers Ufone Pakistan and Sprint USA, the German army, and a German solar technique provider.



Heike Leuschner (heike.leuschner@de.ibm.com), Advisory Software Engineer, IBM

Heike LeuschnerHeike Leuschner is an advisory software engineer for IBM big data solutions. She joined IBM in 2007. Her expertise is in the analysis of requirements and design of IBM InfoSphere BigInsights solutions.



04 June 2013

Also available in Chinese

InfoSphere Streams is a high-performance computing platform that enables continuous and extremely fast analysis of massive volumes of streaming data from multiple sources. The Netezza appliance loads these data sets and stores them for analysis by PureData System for Analytics. This scalable, massively parallel system enables clients to perform complex analytics on enormous data volumes.

However, the default ODBC operator delivered with the Streams 2.0 standard database toolkit isn't enough to maximize the strengths of the high-performance loading utility between the systems. You need to use the bulk load function nzload, which is delivered with the Netezza client. This article shows how to construct C++ primitive operators to use that functionality and how to call the operators from Streams Processing Language (SPL).

Preparing the Netezza and Streams environment

We wanted to look at the interconnection between Streams and Netezza — and especially the usage of the high-performance load utility from Streams into the Netezza database. Our test environment included the PureData System for Analytics N1001-010 (powered by Netezza) and Streams 2.0 as a single-server installation. (For the Streams general installation directions, refer to the product documentation.) The default communication port of the Netezza appliance is 5480, which we used to establish the connection. Figure 1 shows our test environment.

Figure 1. Netezza/Streams connection
Illustration shows IBM PureData System for Analytics N1001-010 on the left and IBM InfoSphere Streams 2.0 on the right, connected by a 5480 communication port

Netezza preparation

One of Netezza's advantages is its simplicity. Consequently, you don't have to take care of the underlying database layout, such as bufferpool and tablespace design. For the Netezza preparation, you need only to create a database: "create database <DB-Name>" and then define a table into which data from Streams should be loaded:

"create table <TABLE-NAME>
      (col1 integer,
      col2 char(20),
      col3 timestamp)"

Streams preparation

To connect Netezza and Streams, and enable the speed of the high-performance Netezza load utility, you'll have to install the Netezza client on the Streams server. Download the Netezza client software. Look for Information Management > IBM Netezza NPS Software and Clients > NPS_7.0.0 > Linux.

To install the Netezza client:

  • Copy the downloaded Netezza client installation package to the Streams server (we used the Linux® package in V6.0.5P6).
  • Log in on the Streams server as root and change directory to which you copied the installation package.
  • Extract package: gunzip nz-linuxclient-v6.0.5.p6.tar.gz tar -xvf nz-linuxclient-v6.0.5.p6.tar. These directories are included:
    • * webadmin (Netezza Online Administration Client)
    • * linux64/ (contains 64-bit ODBC driver)
    • * Linux (contains Netezza client and 32-bit Netezza driver)
  • Change to the Linux directory for installing the Netezza client and unpack the client (using ./unpack).
  • Switch to the installation path and change to the bin directory (for example, cd /usr/local/nz/bintry) and try to start the help page of the Netezza client interface, such as the Netezza terminal (nzsql) and Netezza load utility (nzload):
    nzsql -h
    nzload -h

    Both will be used within Streams.

  • If any libraries are missing, add the paths to the LD_LIBRARY_PATH environment variable (export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:{new_path}. Be sure to use 32-bit libraries, not 64-bit. Check to make sure you set the path correctly with echo $LD_LIBRARY_PATH.
  • If setting the LD_LIBRARY_PATH and verifying that you used the 32-bit version did not solve the missing library issue (for example, libssl.so.4 is still missing), find the installed release of this library (libssl.0.9.8, for example) and create a symbolic link to it with the number of the missing release: ln -s libssl.so.0.9.8 libssl.so.4 ln -s libcrypto.so.0.9.8 libcrypto.so.4.
  • Now the help page of Netezza terminal (nzsql) and Netezza loading utility (nzload) should work with these commands:
    /usr/local/nz/bin/nzsql -h
          This is nzsql, the IBM Netezza SQL interactive terminal.
          Usage: nzsql [options] [security options] [dbname [username] [password]] . . .
          
          nzload -h
             Usage: nzload [-h|-rev] [<options>]
             Options: . . .

    The Netezza communication port (5480) must be open between the Netezza server and the InfoSphere Streams server.
  • Additionally, you can add the directory path of the Netezza client binaries to the PATH environment variable to access the Netezza client applications globally on your shell: export PATH=$PATH:{path of Netezza client binaries}.

Streams implementation details

The Streams example application reads and analyzes XML file sources, enriches the data with lookups from other file sources, and loads it into Netezza.

Figure 2. Streams example application data flow
Illustration shows data flow; XML files on the left and lookup files at the top input into Streams; from Streams, data flows through nzload, to Netezza on the right

Streams applications are implemented in SPL using operators. There are many existing operators available, but it is possible to write your own: composite operators in SPL for higher abstraction level(s) and primitive operators in C++ for special functionality. In our example application, we have two layers: a top-level layer of coarse-grain composite operators; and within each composite, a layer of fine-grain primitive operators.


Streams example application composites

Our example Streams application consists of five composites:

  1. XMLBlob reads the XML files.
  2. parsedMeasurements parses the XML structures received from XMLBlob.
  3. measurementsPerDevice validates and enriches the parsed data received from parsedMeasurements with the help of lookups.
  4. anomaliesDetector writes logs for detected bad values received from parseMeasurements.
  5. dbWriter writes the data received from parsedMeasurements to Netezza.

The Streams/Netezza connection, which you'll be focusing on, is part of the dbWriter composite. Figure 3 shows how the composites are related.

Figure 3. Streams application composites
Illustration shows five application composites

Streams/Netezza load operators

dbWriter uses 10 operators: two Custom, one ThreadedSplit, four PrepareNetezza, one Beacon, one Union, and one LoadNetezza. The enriched data from composite measurementsPerDevice goes into two operators at the same time: the first Custom and ThreadedSplit.

The first Custom operator is used just for logging. The ThreadedSplit operator distributes the data stream into the four parallel PrepareNetezza operators to speed up the preparation.

The PrepareNetezza operators transform the data tuples of mixed attribute types into pure comma-separated tuples of type rstring in a high-performance way. The prepared data is reconnected to one stream again with a Union operator. A Beacon operator helps generate a time trigger. The second Custom operator reads from Union and Beacon. It adds punctuation to the data stream coming from the Union operator whenever a time trigger is received from the Beacon operator.

Finally, the LoadNetezza operator opens a pipe to Netezza, writes data into the pipe, and flushes the piped portion of data to Netezza nzload whenever a certain number of entries are written or a punctuation was received. This happens by closing the pipe. Figure 4 shows these flows.

Figure 4. Operators in the dbWriter composite
Architectural drawing shows operators in the dbWriter composites

Listing 1 shows the appropriate SPL code snippet of the composite shown in Figure 4.

Listing 1. dbWriter SPL code
 . . .
(stream<t_compactedValuesNetezza> records1;
 stream<t_compactedValuesNetezza> records2;
 stream<t_compactedValuesNetezza> records3; 
 stream<t_compactedValuesNetezza> records4) = ThreadedSplit(formattedCompactedValues){
          param
          bufferSize: 3000u;
   }

   stream <rstring buf> prepare1 = PrepareNetezza(records1) {}
   stream <rstring buf> prepare2 = PrepareNetezza(records2) {}
   stream <rstring buf> prepare3 = PrepareNetezza(records3) {}
   stream <rstring buf> prepare4 = PrepareNetezza(records4) {}

   stream <rstring buf> Pipe = Union(prepare1; prepare2; prepare3; prepare4 ) {}

   stream <rstring dummy> Punct = Beacon() {
          param
             period: $NETEZZA_tmclosepipe;
   }

   stream <rstring buf> punctPipe = Custom(Pipe;Punct) {
      logic
      onTuple Punct: submit(Sys.WindowMarker, punctPipe);
      onTuple Pipe:  submit(Pipe, punctPipe);
   }

() as writeRecord = LoadNetezza(punctPipe) {
  param
      hostname  : $NETEZZA_hostname;
      database  : $NETEZZA_database;
      user      : $NETEZZA_user;
      password  : $NETEZZA_password;
      tablename : $NETEZZA_tablename;
      pipelen   : $NETEZZA_pipelen;
 }
. . .

The values for the parameters hostname, database, user, password, tablename, and pipelen need to contain the appropriate content for the database connection. While hostname, database, user, password, tablename are rstrings, pipelen is an int32 and may be, for example, 200000.

The original C++ operators

You can find the original C++ operators in the developerWorks/Community/files under Streams Operators (SPL) > Netezza.tar. For the example in Listing 3, LoadNetezza_cpp.cgt was enhanced with the punctuation to have a time trigger and a record-count trigger, which is useful if the input stream is not continuous. In that case, the enhanced LoadNetezza ensures that data does not reside in the pipe for longer than a certain time because the pipe is flashed on both the time (delivered by punctuation) and record amount.

Custom, ThreadedSplit, Union and Beacon are standard SPL operators. PrepareNetezza and LoadNetezza are our own added (C++ primitive) operators.

Listing 2 shows the implementation of the PrepareNetezza operator. It transforms the input tuples of mixed attribute types into a comma-separated rstring form. Since the PrepareNetezza operator is not part of the standard toolkit set of Streams 2.0, it was designed as a C++ primitive operator.

Listing 2. PrepareNetezza_cpp.cgt operator code
<%SPL::CodeGen::implementationPrologue($model);%>

// Constructor
MY_OPERATOR::MY_OPERATOR(){
   // Initialize the numberCache 
   for (int i=0; i < NUM_NUMSTRINGS; i++)
     snprintf(numberCache[i], sizeof(numberCache[i]), "%d", i);
}

// Destructor
MY_OPERATOR::~MY_OPERATOR() {} 

// Notify port readiness
void MY_OPERATOR::allPortsReady() {}

// Notify pending shutdown
void MY_OPERATOR::prepareToShutdown() {}

// Processing for source and threaded operators   
void MY_OPERATOR::process(uint32_t idx) {}
 
// Tuple processing for mutating ports 
void MY_OPERATOR::process(Tuple & tuple, uint32_t port)
{
  IPort0Type const & ituple = static_cast<IPort0Type const&>(tuple);
  SPCDBG(L_DEBUG, "Processing tuple from input port 0 " << ituple, "dpsop");
  std::stringstream buf;
  for(ConstTupleIterator ti=tuple.getBeginIterator(); ti!=tuple.getEndIterator(); ++ti)
  {
      ConstTupleAttribute attribute = *ti;
      std::string name = attribute.getName();
      ConstValueHandle handle = attribute.getValue();
      std::string temp = getStringValue(handle);
      buf << temp << ",";
  }
  size_t found;
  buf << "\n";
  OPort0Type outTuple;
  outTuple.set_buf(buf.str());
  submit(outTuple, 0);
}

// Tuple processing for non-mutating ports
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {}

// Punctuation processing
void MY_OPERATOR::process(Punctuation const & punct, uint32_t port) {}

std::string MY_OPERATOR::getStringValue(ConstValueHandle & handle) {
  char buf[128];
  switch(handle.getMetaType()) {
    case SPL::Meta::Type::INT64:{
        SPL::int64 v_int64= handle;
        if (v_int64 >= NUM_NUMSTRINGS)
          snprintf(buf, sizeof(buf), "%ld", v_int64);
        else
          strcpy(buf, numberCache[v_int64]);
        break;
      }
    case SPL::Meta::Type::INT32:{
        SPL::int32 v_int32= handle;
        if (v_int32 >= NUM_NUMSTRINGS)
          snprintf(buf, sizeof(buf), "%d", v_int32);
        else
          strcpy(buf, numberCache[v_int32]);
        break;
      }
    case SPL::Meta::Type::INT16:{
        SPL::int16 v_int16= handle;
        if (v_int16 >= NUM_NUMSTRINGS)
          snprintf(buf, sizeof(buf), "%d", v_int16);
        else
          strcpy(buf, numberCache[v_int16]);
        break;
      }
    case SPL::Meta::Type::FLOAT64: {
        SPL::float64 v_float64= handle;
        snprintf(buf, sizeof(buf), "%f", v_float64);
        break;
      }
    case SPL::Meta::Type::FLOAT32: {
        SPL::float32 v_float32= handle;
        snprintf(buf, sizeof(buf), "%f", v_float32);
        break;
      }
    case SPL::Meta::Type::RSTRING: {
        SPL::rstring v_rstring= handle;
        std::string s = handle;
        strcpy(buf, s.c_str());  // todo check len
        break;
      }
    default:
     sprintf(buf, "X");
  }  
return(buf);
}
<%SPL::CodeGen::implementationEpilogue($model);%>

Listing 3 shows the details of the LoadNetezza operator, which was also designed as a C++ primitive operator.

Listing 3. LoadNetezza_cpp.cgt code
#include <stdio.h>
#include <string.h>
#include <unistd.h>
using namespace std;
<%SPL::CodeGen::implementationPrologue($model);%>
<%
  my $hostname   = $model->getParameterByName("hostname");
  my $database   = $model->getParameterByName("database");
  my $user       = $model->getParameterByName("user");
  my $password   = $model->getParameterByName("password");
  my $tablename  = $model->getParameterByName("tablename");
  my $pipelen    = $model->getParameterByName("pipelen");
  if($hostname) { $hostname = $hostname->getValueAt(0)->getSPLExpression();}
  if($database) { $database = $database->getValueAt(0)->getSPLExpression();}
  if($tablename){ $tablename = $tablename->getValueAt(0)->getSPLExpression();}
  if($user)     { $user = $user->getValueAt(0)->getSPLExpression();}
  if($password) { $password = $password->getValueAt(0)->getSPLExpression();}
  if($pipelen)  { $pipelen = $pipelen->getValueAt(0)->getSPLExpression();} 
%>
// Constructor
MY_OPERATOR::MY_OPERATOR() { xx = 0; zz = 0;}
// Destructor
MY_OPERATOR::~MY_OPERATOR() {}
// Notify port readiness
void MY_OPERATOR::allPortsReady() {
    // Notifies that all ports are ready. No tuples should be submitted before
    // this. Source operators can use this method to spawn threads.
     hostname  =  <%=$hostname%>;  
     userid    =  <%=$user%>;  
     password  =  <%=$password%>;  
     database  =  <%=$database%>;  
     tablename =  <%=$tablename%>;
     pipelen   =  <%=$pipelen%>;
     createThreads(1); // Create source thread   
}
// Notify pending shutdown
void MY_OPERATOR::prepareToShutdown() {}
// Processing for source and threaded operators   
void MY_OPERATOR::process(uint32_t idx){
  SPCDBG(L_ERROR, "Processing uint tuple tuple from input port 0 " , "dpsop");
}
// Tuple processing for mutating ports 
void MY_OPERATOR::process(Tuple & tuple, uint32_t port) {}
// Tuple processing for non-mutating ports
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {  
   SPCDBG(L_DEBUG, "Processing tuple from input port 0 " << tuple, "dpsop");
   if ( xx == 0 ) { 
      SPLLOG(L_ERROR," reset xx ","dpsop");
      if (pipe(pipes)) {
         printf("error creating pipe\n");
      }     
      int rc = fork(); // Fork/exec a nzload process
      switch(rc) {
      case -1: printf("Error on fork.\n");
      case  0: // child
        // set stdin to read off the pipe. then exec nzload
        if (-1 == dup2(pipes[0], 0)) { printf("dup error\n");}
        close(pipes[1]);
        execlp("nzload","nzload", "-host", hostname.c_str(), "-u", userid.c_str(),
               "-pw", password.c_str(), "-db", database.c_str(), "-t", tablename.c_str(), 
               "-delim", ",", NULL);
        printf("exec error\n");
      }  
      child = rc;
      close(pipes[0]);
   }   
   xx = xx + 1; 
   zz++;
   IPort0Type const & ituple = static_cast<IPort0Type const&>(tuple);
   SPCDBG(L_DEBUG, "Processing tuple from input port 0 " << ituple, "dpsop");
   ConstTupleIterator ti=tuple.getBeginIterator();
   ConstTupleAttribute attribute = *ti;
   std::string buf = attribute.getValue(); 
   // Write the data values to the pipe
   write(pipes[1], buf.c_str(), buf.size()); 
   if ((0 == zz % pipelen) || (1 == zz))
      SPLLOG(L_ERROR, " ZZZ wrote record: " << zz , "dpsop"  );
   if (xx == pipelen ) {
      SPLLOG(L_ERROR, " ZZZ before waitpid wrote record: " << zz , "dpsop"  );
      close(pipes[1]);
      SPLLOG(L_ERROR, " ZZZ after waitpid wrote record: " << zz , "dpsop"  );
      xx = 0;
   }
}
// Punctuation processing
void MY_OPERATOR::process(Punctuation const & punct, uint32_t port) {
   SPCDBG(L_ERROR, "Processing punctuation  tuple from input port 0 " , "dpsop");
   if(punct==Punctuation::WindowMarker) {
      close(pipes[1]);
      waitpid(child, NULL, 0);
      SPLLOG(L_ERROR, " ZZZ wrote record: " << zz , "dpsop"  );
      xx = 0;
   } else {
     if(punct==Punctuation::FinalMarker) {
        close(pipes[1]);
        waitpid(child, NULL, 0);
        SPLLOG(L_ERROR, " ZZZ wrote record: " << zz , "dpsop"  );
      }
   }    
}
<%SPL::CodeGen::implementationEpilogue($model);%>

Here, you can see that LoadNetezza opens a pipe, writes data into it until a certain amount of data or a punctuation is reached, closes the pipe, and executes nzload to transfer the piped bulk of data to Netezza. Your high-throughput connection is established.


Conclusion

Streams and Netezza are great technologies to handle a huge amount of data. Streams reads, transforms, and forwards massive amounts of data in a short time frame (near real time). The Netezza appliance loads these data sets and stores them for analytical analysis. However, the default ODBC operator delivered with the InfoSphere Streams 2.0 standard database toolkit can't maximize the strengths of the high-performance Netezza load utility between both systems. You need to use the bulk load function nzload, which is delivered with the Netezza client. This article shows how to construct C++ primitive operators to use that functionality and how to call the operators from Streams Processing Language. For future work, similar operators are part of the database toolkit that comes with InfoSphere Streams 3.0.

Resources

Learn

Get products and technologies

  • Get a trial version of InfoSphere Streams and build applications that rapidly ingest, analyze, and correlate information as it arrives from thousands of real-time sources.
  • Get a trial version of IBM InfoSphere BigInsights and manage and analyze massive volumes of structured and unstructured data at rest.
  • Build your next development project with IBM trial software, available for download directly from developerWorks.

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Big data and analytics on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Big data and analytics, Information Management
ArticleID=931883
ArticleTitle=Integrating PureData System for Analytics with InfoSphere Streams
publish-date=06042013