Integrating SPSS Model Scoring in InfoSphere Streams, Part 1: Calling Solution Publisher from an InfoSphere Streams operator

Use data modeling with your streaming data application for actionable business intelligence

Learn how to leverage the predictive power of SPSS in a real-time scoring environment. Part 1 of this "Integrating SPSS Model Scoring in InfoSphere Streams" series describes how to write and use an IBM® InfoSphere® Streams operator to execute an IBM SPSS Modeler predictive model in an InfoSphere Streams application, using the IBM SPSS Modeler Solution Publisher Runtime Library API.

Mike Koranda (koranda@us.ibm.com), InfoSphere Streams Release Manager, IBM China

Mike KorandaMike Koranda is a senior technical staff member in IBM's Software Group and has been working at IBM for more than 30 years. He has been working in the development of the InfoSphere Streams product for the past six years.



13 October 2011

Also available in Chinese

Before you start

About this series

InfoSphere Streams is a platform that enables real-time analytics of data in motion. The IBM SPSS family of products provides the ability to build predictive analytic models. This "Integrating SPSS Model Scoring in InfoSphere Streams" series is for Streams developers who need to leverage the powerful predictive models in a real-time scoring environment.

About this tutorial

This tutorial describes how to create an InfoSphere Streams operator that can be used from Streams applications to execute SPSS predictive models. It provides a sample operator and data that demonstrate this integration. It goes on to describe how the sample can be adjusted for use with any appropriate SPSS model. In Part 2, you learn how this non-generic operator is extended to use the predictive model's XML metadata to allow use of a SPSS predictive model in Streams without C++ skill required to customize.

Objectives

In this tutorial, learn what a data analyst needs to do in SPSS Modeler to prepare a predictive model for scoring in Streams, see how a Streams component developer can build an operator to execute that model, and learn how a Streams application can use that operator to produce real-time scored results from streaming data.

Prerequisites

This tutorial is written for Streams component developers and application programmers who have Streams programming language skills and C++ skills. Use the tutorial as a reference, or the samples in it can be examined and executed to demonstrate the techniques described. To execute the samples, you should have a general familiarity with using a UNIX® command-line shell and working knowledge of Streams programming.

System requirements

To run the examples, you need a Red Hat Enterprise Linux® box with InfoSphere Streams V2.0 or later and IBM SPSS Modeler Solution Publisher 14.2 fixpack 1, plus the Solution Publisher hot fix, which is scheduled to be available 14 Oct 2011.


Overview

Why integrate InfoSphere Streams and SPSS?

IBM SPSS Modeler provides a state-of-the-art environment for understanding data and producing predictive models. InfoSphere Streams provides a scalable high-performance environment for real-time analysis of data in motion, including traditional structured or semi-structured data to unstructured data types. Some applications have a need for deep analytics against stored information and low-latency, high-volume, real-time use of those analytics to provide scoring.

Roles and terminology

Before going too far, it will be useful to describe a few roles and their responsibilities and also present some terminology that will be used throughout the tutorial.

Roles

  • Data analyst: A modeling expert that knows how to use the IBM SPSS Modeler tools to build and publish predictive models.
  • Streams application developer: An InfoSphere Streams developer responsible for building applications.
  • Streams component developer: An InfoSphere Streams developer responsible for developing components that a Streams Application Developer uses.

The focus of this tutorial is on the Streams component developer role and how to write an operator that can execute SPSS predictive models. The other roles are included as they are needed to understand the work and interaction necessary for the overall solution. Also note that in many cases, the Streams application developer and Streams component developer are the same person.

Terminology

  • Predictive model: A prepared scoring branch of an SPSS Modeler Stream sometimes referred to as a model.
  • Streams Processing Language (SPL): The language used to write InfoSphere Streams applications.
  • Operator: The basic building-block component that InfoSphere Streams applications use. There is a set of operators shipped with the Streams product, and customers can write their own custom operators.

NOTE: There is potential confusion with the overloading of the term "streams." The InfoSphere Streams product refers to streams of data and streams applications built using SPL. The SPSS Modeler product creates a workflow of connected modeler components known as a "stream." For the purpose of this tutorial, the SPSS modeler streams will be referred to as predictive analytics or models and the term "stream" will be taken to mean an InfoSphere Streams data stream.

Sample scenario

The following is an overall flow for integrating model development with real-time scoring:

  1. A data analyst determines what attributes and model are needed.
  2. A Streams component developer produces the operator that takes those attributes as input and produces the desired output.
  3. A Streams application developer builds the application that obtains the attributes, calls the operator, and takes action based on the resulting scores.

This typically is an iterative process with discussions of what attributes are needed from all of those available in the planned Streams flow. As such, we will work through a sample scenario where a data analyst evaluates historical data to produce a useful scoring model and passes that on to the Streams component developer who builds a suitable operator to execute the model in a Streams environment. The Streams application developer will produce a trivial Streams application to validate the model.

Next, we will describe the responsibility of the data analyst in preparing the models to be scored.


Building the predictive models

Determining what data is available, what models are appropriate, and building those models is typically done by a data analyst.

This tutorial will build on the Market Basket Analysis demo and tutorial shipped with the SPSS Modeler Client. We will show how the model scoring stream built in Modeler can be published and used in an InfoSphere Streams application to enable real-time scoring and decision leveraging a SPSS model's logic.

The Market Basket Analysis example deals with fictitious data describing the contents of supermarket baskets (collections of items bought together) plus the associated personal data of the purchaser, which might be acquired through a loyalty card program. The goal is to discover groups of customers who buy similar products and can be characterized demographically, such as by age, income, etc.

This example reveals how IBM SPSS Modeler can be used to discover affinities, or links, in a database, both by modeling and by visualization. These links correspond to groupings of cases in the data, and these groups can be investigated in detail and profiled by modeling. In the retail domain, such customer groupings might, for example, be used to target special offers to improve the response rates to direct mailings or to customize the range of products stocked by a branch to match the demands of its demographic base.

The building and analyzing of models is beyond the scope of this tutorial. Please refer to the material provided with the SPSS Modeler Client for more information on the data analyst role and modeling in general.

This tutorial starts from a working modeler session as shown in Figure 2.

Figure 1. Basket Rule modeler client
Image shows the Basket Rule STR in modeler client

In order to use the predictive model built in this session, you need to create a scoring branch by connecting an input node to the scoring nugget and connecting the scoring nugget to an output flat file as shown below. Note that this branch in our example is quite trivial and not representative of the typical complexity of real modeling applications. It relies on two input fields: gender and income, to predict whether a customer is likely to purchase a combination of beer, beans, and pizza.

Figure 2. Basket Rule scoring branch
Image shows the Basket Rule scoring branch in modeler client

We have created a sample input file to test the execution of the model.

Figure 3. Basket Rule sample input file
Image shows the Basket Rule sample input file data in modeler client

Running the branch from within Modeler using the sample set of user inputs produces the following file. This sample data can be used to validate the scoring branch in the SPSS Modeler Workbench. It will also be used to validate the streams operator and the application we will build later.

Figure 4. Basket Rule sample output file
Image shows the Basket Rule sample output file data in Excel

It is probably best to save the modified stream above after the scoring branch has been created and then use a temporary Export node to publish the artifacts necessary to use the scoring model in the Streams operator. In the Modeler session, change the output file setting to something similar to what is shown below and click Publish.
NOTE: Be sure to specify to publish the metadata. This will produce an XML document that describes the inputs and outputs of the model, including their field and field types. This is the information necessary to convey to the Streams Component Developer to enable them to build the operator to call this predictive model.

Figure 5. Basket Rule publish dialog
Image shows the Basket Rule publish dialog in modeler client

The completed modeler workbench session is provided in the streamsbasketrule.str file (see Download).

The contract between the data analyst and the Streams component developer

In order to write the Streams operator, the Streams component developer needs to know certain information, such as the inputs and outputs of the predictive model. Specifically, the operator developer will require:

  • The install location of Solution Publisher.
  • The .pim and .par files produced during the publish.
  • The input source node key name. This can be found in the XML fragment:
    <inputDataSources>
    <inputDataSource name="file0" type="Delimited">

    NOTE: While there is no technical limitation, our example is limited to supporting a single input source for simplicity.

  • The input field names and storage, and their order as found in the <inputDataSource> tag:
    Listing 1. Input field names as found in inputDataSource tag
    <fields>
      <field storage="string" type="flag">
        <name>sex</name>
      </field>
      <field storage="integer" type="range">
        <name>income</name>
      </field>
    </fields>
  • The output source node key name. This can be found in the XML fragment in Listing 2.
    Listing 2. Output source node key name as found in XML fragment
    <outputDataSources>
    <outputDataSource name="file3" type="Delimited">

    NOTE: While there is no technical limitation, our example is limited to supporting a single output source for simplicity.

  • The output field names and storage, and their order as found inside the <outputDataSource> tag.
    Listing 3. Output field names as found in outputDataSource tag
    <fields>
     <field storage="string" type="flag">
      <name>sex</name>
     </field>
     <field storage="integer" type="range">
      <name>income</name>
     </field>
     <field storage="string" type="flag">
      <name>$C-beer_beans_pizza</name>
      <flag>
       <true>
        <value>T</value>
       </true>
       <false>
        <value>F</value>
       </false>
      </flag>
     </field>
     <field storage="real" type="range">
      <name>$CC-beer_beans_pizza</name>
      <range>
       <min>
        <value>0.0</value>
       </min>
       <max>
        <value>1.0</value>
       </max>
      </range>
     </field>
    </fields>

The data analyst has also informed the Streams component developer that the model does not modify the input parameters, even though they are listed as outputs on the model. While not critical, this information will allow the operator writer to optimize by not recopying those fields to the output tuple.

Now that we have the model published and the information necessary to write the operator, the next section covers how the Streams component developer goes about producing the operator.


Writing the operator

Preparing to write the operator

Since SPSS Modeler runs in Windows®, and Streams runs on a Linux system, it is a good idea to validate that the Linux environment is properly set up to execute the .pim file provided by the data analyst. For our sample scenario, since we provided a sample input file and know the expected output, we can verify this with the stand-alone IBM SPSS Modeler Runtime modelerrun script provided as part of Solution Publisher.

NOTE: The SPSS hot fix to enable scoring through Streams is not compatible with modelerrun. Once applying the SPSS hot fix, the modelerrun program will no longer work. To perform this validation as described below, you must do it before installing the hot fix or install Solution Publisher to two different locations and only apply the hot fix to the installation used with Streams.

To run the modelerrun script on our sample data and model, unzip the sample file (see Download) to a location on a Linux system where you have Solution Publisher installed. Change directory to the data directory under the baskrule directory and execute the modelerrun script from the Solution Publisher install location, as shown below.

Listing 4. modelerrun script
bash-3.2$
bash-3.2 cd /homes/hny1/koranda/baskruletest1/baskrule/data
bash-3.2$  ~koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2/modelerrun 
        -p baskrule.par baskrule.pim 
IBM SPSS Modeler Runtime 14.2
(C) Copyright IBM Corp. 1994, 2011
bash-3.2$

Then view the results.

Listing 5. Results from running script
bash-3.2$ cat baskrule.csv 
sex,income,$C-beer_beans_pizza,$CC-beer_beans_pizza
F,10000,F,0.988327
F,20000,F,0.989645
F,15000,F,0.988327
F,25000,F,0.989645
F,10000,F,0.988327
F,20000,F,0.989645
F,15000,F,0.988327
F,25000,F,0.989645
M,10000,T,0.838323
M,20000,F,0.990964
M,15000,T,0.838323
M,25000,F,0.990964
M,10000,T,0.838323
M,20000,F,0.990964
M,15000,T,0.838323
M,25000,F,0.990964
bash-3.2$

Once you have validated that the Solution Publisher environment is working, you can move onto creating the operator.

Operator characteristics

The Streams component developer will take the information provided by the data Analyst and build an operator. In our sample scenario, this operator needs to accept a single tuple that contains two attributes:

  • Gender as a string of "M" or "F"
  • Income that contains an integer value

This will produce a single output tuple, passing through all input tuple attributes and adding the following attributes:

  • Prediction string containing "T" or "F"
  • Confidence (float)

We accomplish this by writing a non-generic operator (an operator that only works on this specific tuple/stream format) that uses the Solution Publisher API to execute the .pim file. More information about the Solution Publisher API is provided in the Resources section.

Outline of an operator

The general outline of an operator using the Solution Publisher API is as follows:

  1. In the operator's initialization code:
    1. Initialize the Solution Publisher clemrtl API library.
    2. Open the model image .pim file and receive an image handle.
    3. Get the necessary information for the input and output field types from the image required for the alternative input and output calls.
    4. Define a function that will be used to map from the input tuple attributes to the input fields required in the model and register it in the image.
    5. Define a function that will be used to map from the output fields produced by the model to the output tuple attributes and register it in the image.
    6. Since the same image is to be executed multiple times without changing parameters, prepare the image once during initialization.
  2. In the operator's process method (called for each input tuple):
    1. Populate the data structure registered for the alternate input from the input tuple attributes.
    2. Execute the image.
    3. Populate the output tuple attributes from the data structure registered for the alternative output.
  3. In the operator's prepareToShutdown method:
    1. Close the image.

We'll now walk through each area, highlighting the code required in the Streams operator artifacts. The complete listing for all these artifacts is included the baskrule/MyOp/MyOp.xml, baskrule/MyOp/MyOp_h.cgt and baskrule/MyOp/MyOp_cpp.cgt files in the baskrule.zip file.

Operator model

In the operator's XML model definition's dependencies section, the Solution Publisher code location must be specified.

Listing 6. Dependencies section
<libraryDependencies>
  <library>
    <cmn:description>spss-lib</cmn:description>
    <cmn:managedLibrary>
      <cmn:libPath>/opt/IBM/SPSS/ModelerSolutionPublisher/14.2</cmn:libPath>
      <cmn:includePath>/opt/IBM/SPSS/ModelerSolutionPublisher/14.2/clemrtl/inc
        lude</cmn:includePath>
    </cmn:managedLibrary>
  </library>
</libraryDependencies>

The complete operator model can be found in baskrule/MyOp/MyOp.xml.

Operator initialization code

Prior to calling any of the Solution Publisher APIs, the operator needs to dynamically load the Solution Publisher API and initialize the entry points. For our operator, this requires the following types of definitions in the _h.cgt file (see baskrule/MyOp/MyOp_h.cgt for the complete list of entry points defined).

Listing 7. Definitions in _h.cgt file
libclemertl = dlopen("libclemrtl.so", RTLD_NOW | RTLD_DEEPBIND);
    if(!libclemertl) 
        throw SPLRuntimeOperatorException(getThisLine(), dlerror());   
/* get the routine addresses */ 
clemrtl_initialise_ext = (clemrtl_initialise_ext_t) 
    dlsym(libclemertl,"clemrtl_initialise_ext");
    if(!clemrtl_initialise_ext)
         throw SPLRuntimeOperatorException(getThisLine(), dlerror());

This loading code would be the same for any operator that calls Solution Publisher and would not need to change to use different models or data.

Then in the operator's constructor in the _cpp.cgt file, the libraries are loaded.

Listing 8. _cpp.cgt file
typedef int (*clemrtl_initialise_ext_t)(unsigned, int, void*);
...
void * libclemertl;
  clemrtl_initialise_ext_t clemrtl_initialise_ext;

Initialize the solution publisher clemrtl API library

The first Solution Publisher API necessary is to initialize the library using clemrtl_initialise_ext(). The initialize call requires a parameter of the Solution Publisher installation directory. For this we have created a parameter on the operator that can be specified in the .spl file. The default is the standard Solution Publisher install location. This can be seen in the baskrule/MyOp/MyOp_cpp.cgt file.

Listing 9. baskrule/MyOp/MyOp_cpp.cgt file
rstring installationDirectory = "/opt/IBM/SPSS/ModelerSolutionPublisher/14.2";
    if(hasParameter("SP_Install"))
      installationDirectory = getParameter("SP_Install");
    SPLLOG(L_INFO, "About to clemrtl initialise using SP_Install of: "<< 
        installationDirectory, "MPK");
    clemrtl_init_arg args[] = {
      {"installation_directory", installationDirectory.c_str()},
    };
    const int arg_count = sizeof args / sizeof args[0];
    if (clemrtl_initialise_ext(0,arg_count,args) != CLEMRTL_OK) {
        SPLLOG(L_ERROR, "Clemrtl initialise failed", "MPK");
    }

Open an image

Open an image using clemrtl_openImage() and receive an image handle. This API requires the location of the .pim and .par files provided by the data analyst. Note we have hard-coded the .pim and .par filenames and expect them to be in the data directory. A more general solution would be to add these as parameters passed into the operator, similar to the way the Solution Publisher install location was specified.

Listing 10. Opening the image
/* open the image */
  int res, status = EXIT_FAILURE;
  image_handle = 0;
     
  res = clemrtl_openImage("baskrule.pim","baskrule.par", &image_handle);
  if (res != CLEMRTL_OK) {
		status = EXIT_FAILURE;
		SPLLOG(L_ERROR, "Open Image Failed", "MPK");
		displayError(res, 0);
	}

The displayError routine was defined to obtain additional detailed information from the specific error and end the operator by throwing a runtime exception.

Get the input and output field information

Use clemrtl_getFieldCount() and getFieldTypes() to obtain information about input and output fields and types. Note that the key and keyOut fields must contain the values provided by the data analyst from the XML metadata. In our sample, the <inputDataSource name="file0" type="Delimited"> and <outputDataSource name="file3" type="Delimited">inputDataSources> names must be used.

Listing 11. Getting input and output field information
/* Get Input field count and types */
    char* key="file0";
  res = clemrtl_getFieldCount(image_handle, key, &fld_cnt );
  if (res != CLEMRTL_OK) {
  	status = EXIT_FAILURE;
	SPLLOG(L_ERROR, "Get Field Count Failed", "MPK");
	displayError(res, image_handle);
  }
  SPLLOG(L_INFO, "Field Count is: "<<(int)fld_cnt, "MPK");
		
  int fld_types[100]; // needs to be bigger if more than 100 fields expected
  res = clemrtl_getFieldTypes(image_handle, key, fld_cnt, fld_types );
  if (res != CLEMRTL_OK) {
  	status = EXIT_FAILURE;
  	SPLLOG(L_ERROR, "Get Field Types Failed", "MPK");
  	displayError(res, image_handle);
  }
  field_proc(fld_cnt,fld_types); 

  /* Get Output field count and types */
  size_t fld_cnt_out;
  char* keyOut="file3";
  res = clemrtl_getFieldCount(image_handle, keyOut, &fld_cnt_out );
  if (res != CLEMRTL_OK) {
  	status = EXIT_FAILURE;
  	SPLLOG(L_ERROR, "Get Output Field Type Failed", "MPK");		
  	displayError(res, image_handle);
  }
  SPLLOG(L_INFO, "Output Field Count is: "<<(int)fld_cnt_out, "MPK");
	
  int fld_types_out[100]; // needs to be bigger if more than 100 fields expected
  res = clemrtl_getFieldTypes(image_handle, keyOut, fld_cnt_out, fld_types_out );
  if (res != CLEMRTL_OK) {
  	status = EXIT_FAILURE;
  	SPLLOG(L_ERROR, "Get Output Field Types Failed", "MPK");
  	displayError(res, image_handle);
  }
  field_proc(fld_cnt_out,fld_types_out);

Define the input function

The Solution Publisher API provides the ability to define an input iterator function that will be called during the execute process to provide the data used in the model. Our function will be used to map from the input tuple attributes to the input fields required in the model. We register it in the image using clemrtl_setAlternativeInput(). When you register this, it requires that you pass the address of the input field structure that contains pointers to each input field.

Listing 12. Defining the input function
void** MY_OPERATOR::next_record(void* arg) {
	  SPLLOG(L_INFO, "In next_record iterator", "MPK");
    return *((buffer*) arg)->next_row++;
}

The structure and pointer for the input data are defined in the header _h.cgt file.

Listing 13. Structure and pointer for input data
typedef struct {
	  void** row[2];
	  void*** next_row;
	} buffer;
...
buffer myBuf;

In our case, we hard-coded the row array size of 2 to allow for processing a single input tuple and the second row to contain a NULL to indicate no more rows.

The memory for the input array of void* pointers that point to the input fields is allocated as an array of void* pointers with a size of the number of input fields (as determined from the XML metadata file) and should match the number of input fields returned from the earlier call to clemrtl_getFieldCount(image_handle, key, &fld_cnt );.

Listing 14. Initializing the buffer
/* initialize the buffer */
    void* inPointers[2];
    myBuf.row[0] = (void**) &inPointers;
    myBuf.row[1] = NULL;
    myBuf.next_row = myBuf.row;

The setAlternativeInput call then passes the key corresponding to the model input section, the input field count and types retrieved earlier, the function address, and the address of the input data structure.

Listing 15. setAlternativeInput call
res = clemrtl_setAlternativeInput(image_handle, key, fld_cnt, fld_types, 
     MY_OPERATOR::next_record, (void*) &myBuf );
if (res != CLEMRTL_OK) {
  status = EXIT_FAILURE;
  SPLLOG(L_ERROR, "Set Alternative Input Failed", "MPK");
  displayError(res, image_handle);
}

Define the output function

The Solution Publisher API provides the ability to define an output iterator function that will be called during the execute process to capture the data produced in the model. Our function will be used to map from the output fields produced by the model to the output tuple attributes. It is registered in the image using clemrtl_setAlternativeOutput(). When you register this, it allows you to pass an object that will be opaquely passed into the function when called during execute. In our case, we will pass a structure that maintains a linked list of output buffers that will be created during each iteration of the function. Later, this linked list will be used to move the output field data into an output tuple and sent on the output port.

Listing 16. Output function
void MY_OPERATOR::next_record_back(void* arg, void** row) {
	  outField* ofp;
	  ofp = (outField*) arg;
	  
	  outBuffer* obp;
	  obp = (outBuffer*) new outBuffer;
	  obp->next=0;
	  if (ofp->head == 0) { // first call
	    ofp->head = obp;
	    ofp->tail = obp;
	  } else {
	    (ofp->tail)->next = obp;
	    ofp->tail = obp; 
	  } 
	  
	    if (row[0]) {
        obp->sex = (const char *) row[0];
        obp->_missing_sex = false;
      } else {obp->_missing_sex = true;}
      if (row[1]) {
        obp->income = *((long long *) row[1]);
        obp->_missing_income = false;
      } else {obp->_missing_income = true;}
      if (row[2]) {
        obp->prediction = (const char *) row[2];
        obp->_missing_prediction = false;
      } else {obp->_missing_prediction = true;}
      if (row[3]) {
        obp->confidence = *((double *) row[3]);
        obp->_missing_confidence = false;
      } else {obp->_missing_confidence = true;}
}

The structure for the output data from the model is defined in the header file.

Listing 17. Structure for output data
typedef struct {
  void* next;
  const char* sex;
  boolean _missing_sex;
  long long income;
  boolean _missing_income;
  const char* prediction;
  boolean _missing_prediction;
  double confidence;
  boolean _missing_confidence;
} outBuffer;

Defining this structure is where the information provided by the data analyst related to the output fields in the XML metadata comes into play. The storage value is used to determine the type of data in the structure returned and the code necessary move it from the output row returned to the structure's memory. Also, the order of the fields in the structure and returned row are described by the order in the XML metadata provided by the data analyst.

In our output iteration function, we test to make sure a value was produced by the model. An address of returned for a field indicates a missing value. We will capture the fact that value is missing in the corresponding _missing_xxx field defined in our structure. Later, this will be used when populating the output tuple.

The setAlternativeOuput call passes the key corresponding to the model output section, the output field count and types retrieved earlier, the function address, and the pointer to the structure that maintains the linked list to manage the data produced in the model.

Listing 18. Alternative output
/* Set the alternative output */
res = clemrtl_setAlternativeOutput(image_handle, keyOut, fld_cnt_out, fld_types_out, 
     MY_OPERATOR::next_record_back, (void*) &myOutField );
if (res != CLEMRTL_OK) {
  status = EXIT_FAILURE;
  SPLLOG(L_ERROR, "Set Alternative Output failed", "MPK");
  displayError(res, image_handle);
}

Prepare the image

Since the same image is to be executed multiple times without changing parameters, we use the clemrtl_prepare(), which will result in less processing during execution for each tuple.

Listing 19. Preparing the image
/* prepare the image */
res = clemrtl_prepare(image_handle);
if (res != CLEMRTL_OK) {
  status = EXIT_FAILURE;
  SPLLOG(L_ERROR, "Prepare Failed", "MPK");
  displayError(res, image_handle);
}

Operator process method

Populate the input fields

As each input tuple arrives, we deal with it in the operator's process method. First we populate the data structure defined during the setAlternateInput () from the input tuple attributes. Note that this is where the information provided by the data analyst in the XML metadata related to the input fields comes into play. The storage value is used to determine the type of data in the structure to be populated and the attributes on the stream need to match the storage required by the model. In our sample, the tuple attribute s_sex is defined as an rstring. Once retrieved from the input tuple, the .c_str() method is used to get the underlying C reference. This matches the storage expected by the first input field, as described in <field storage="string" type="discrete"><name>sex</name>.

Likewise, the tuple attribute s_income is defined as an int64. Once retrieved, the reference of the underlying storage is returned using the attribute's getter. This matches the storage expected by the second input field, as described in <field storage="integer" type="range"><name>income</name>. Note also that the order of the fields to be populated in the row is described by the order in the XML metadata provided by the data analyst.

Listing 20. Fields to be populated
myBuf.row[0] [0] = (void*) (tuple.get_s_sex().c_str());
myBuf.row[0] [1] = (void*) &(tuple.get_s_income());

myBuf.next_row = myBuf.row; //set to point to the new data

Execute the image

Execute the image using clemrtl_execute(). Note that we use the same displayError routine (that will throw an exception ending the operator), so if an error in execution should occur, the operator is terminated and no more scoring will occur. It may be more appropriate to log the failure for a single tuple and ignore it. Or we could indicate an error by writing it to an additional output port used to handle errors or indicate in an additional attribute that the model execution failed and the results are incorrect. But for our simple example, we'll go with ending the operator.

Listing 21. Executing the image
/* execute the image */
  res = clemrtl_execute(image_handle);
  if (res != CLEMRTL_OK) {
    status = EXIT_FAILURE;
    SPLLOG(L_ERROR, "Execute Failed", "MPK");
    displayError(res, image_handle);
  }

After successful execution of the model, myOutField contains the head of the linked list of output data returned. This list is traversed and for each element, the output tuple attributes are populated from the data copied during the output iterator function. The tuple will then be submitted on the output port. You can see that if no output was produced by the model (linked list is empty), no output tuple will be sent. If the model produces multiple sets of output values for a single input, then multiple tuples will be sent on the output stream. Again, the types defined for these attributes must match the types in the outBuffer structure as derived from the XML metadata. In our sample, we defined the attributes as rstring predLabel, float64 confidence.

The indication of a missing value will result in the tuple's output attribute being left at its default value after the clear. Note that these defaults may actually be part of the domain of values the model could return, so different values or different processing might be necessary depending on your model.

Listing 22. Processing the list of output data
outBuffer* currentOutBuf = (outBuffer*)(myOutField.head);
    while (currentOutBuf) {
      otuple.clear(); //reset all attributes to their default values. 
      otuple.assignFrom(tp, false); // move input fields to output tuple

      /* Dig out what was returned and add to output tuple */
      if (currentOutBuf->_missing_prediction == false) 
        otuple.set_predLabel(currentOutBuf->prediction);
      if (currentOutBuf->_missing_confidence == false) 
        otuple.set_confidence(currentOutBuf->confidence);
  
      submit(otuple, 0);
   	
      /* move to next output buffer in list */    	 
      outBuffer* nextPtr = (outBuffer*)(currentOutBuf->next);
      delete currentOutBuf; // free the memory allocated in the output iterator
      currentOutBuf = nextPtr;
    } // end of while loop

Operator prepareToShutdown method

Close the image

When the operator has been told to shutdown, we will close the image using clemrtl_closeImage(), allowing for a graceful shutdown.

Listing 23. Closing the image
/* close the image */
  res = clemrtl_closeImage(image_handle);
  if (res != CLEMRTL_OK) {
    status = EXIT_FAILURE;
    SPLLOG(L_ERROR, "Close Image Failed", "MPK");
    displayError(res, image_handle);
  }

We have completed the development of our scoring operator. Next, we will see how a Streams application developer can use that operator in a Streams application.


Using the operator

Using the scoring model operator in an InfoSphere Streams application

Integrating a scoring model into a Streams application would typically be done by a Streams application developer. In a real Streams application, the input data might come from one or more continuously streaming sources of data and the streaming operator's result in producing one tuple of information at a time. For our tutorial, we will simulate this by using a file containing rows to be scored and use the InfoSphere Streams FileSource operator to read in the information and produce a stream of these tuples.

In a real streaming application, the output scores would be processed by further downstream application segments, written to external systems or saved in historical data stores. Here, we will simulate this by having the scored tuples written to an output file one tuple at a time using the InfoSphere Streams FileSink operator.

The inputs and outputs expected by the operator were built into the operator and would need to be communicated by the component developer to the application developer. In our case, they would need to communicate that this operator expects two input attributes: s_sex (an rstring value of T or F) and s_income (an int64 value), and produces two output attributes: prediction (an rstring T or F for whether this input indicates a preference of purchasing a combination of beer, beans, and pizza) and confidence (a float64 representing the confidence of that prediction).

Running the sample SPL application

Requirements and setup

  1. To build and run the sample application, you need a functioning InfoSphere Streams environment.
  2. You need the IBM SPSS Modeler Solution Publisher Runtime 14.2 fixpack 1, plus the Solution Publisher hot fix (scheduled to be available 14 Oct 2011) installed in this environment.
  3. You also need to ensure that the LD_LIBRARY_PATH is set on all systems that the Streams operator will be deployed on to contain the necessary Solution Publisher libraries.

LD_LIBRARY_PATH requirement

Assuming Solution Publisher is installed in $INSTALL_PATH, the LD_LIBRARY_PATH needs to include:

  • $INSTALL_PATH
  • $INSTALL_PATH/ext/bin/*
  • $INSTALL_PATH/jre/bin/classic
  • $INSTALL_PATH/jre/bin

A script included in the ZIP file ldlibrarypath.sh is provided to set the path up correctly. If Solution Publisher is not installed in the default location, change the first line of the script to point to your Solution Publisher install directory before using the script. For example, if Solution Publisher is installed in /homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2, set CLEMRUNTIME=/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2.

Sample contents

The sample ZIP file (see Download) contains the .pim, .par, and XML files from the Market Basket Analysis sample with a scoring branch added, sample input and expected output files, and a complete Streams Programming Language application, including the custom operator that scores the Market Basket Analysis model.

We provide a simple SPL application in baskrule/MpkSimpleSpss.spl that looks like Figure 6.

Figure 6. SPL application
Image shows SPL application with FileSource, MyOp operator, and FileSink

Adjusting and compiling the sample

To run the sample SPL application, unzip the baskrule.zip file (see Download) to your Linux system that has InfoSphere Streams and Solution Publisher installed. If the Solution Publisher Install location is different from the default value of /opt/IBM/SPSS/ModelerSolutionPublisher/14.2, modify the operator XML file (MyOp.xml) in the MyOp directory. Change the libPath and includePath entries to match your Solution Publisher install location.

Listing 24. libPath and includePath entries
<cmn:libPath>/opt/IBM/SPSS/ModelerSolutionPublisher/14.2</cmn:libPath>
<cmn:includePath>/opt/IBM/SPSS/
    ModelerSolutionPublisher/14.2/clemrtl/include</cmn:includePath>

You also need to modify the MpkSimpleSpss.spl file by adding the SP_Install parameter on the operator's invocation.

Listing 25. Adding SP_Install parameter
stream<DataSchemaPlus> scorer = MyOp(data){
    param SP_Install: "/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2";
  }

Compiling the sample

To compile the sample as a stand-alone Streams application, change directory to where you unzipped the sample project (baskrule) and run the make command as shown below.

Listing 26. Running the make command
bash-3.2$ cd STSPTest/baskrule/
bash-3.2$ make
/homes/hny1/koranda/InfoSphereStreams64/bin/sc -a -T -M Main 
Creating types...
Creating functions...
Creating operators...
Creating PEs...
Creating standalone app...
Creating application model...
Building binaries...
 [CXX-type] tuple<rstring s_sex,int64 s_income>
 [CXX-operator] data
 [CXX-operator] scorer
 [CXX-type] tuple<rstring s_sex,int64 s_income,rstring predLabel,float64 confidence>
 [CXX-operator] Writer
 [CXX-pe] pe0
 [CXX-standalone] standalone
 [LD-standalone] standalone
 [LN-standalone] standalone 
 [LD-pe] pe0 
bash-3.2$

Executing the sample

To execute the application, make sure you have set the LD_LIBRARY_PATH. Listing 27 shows the command to set the path and echo the path back to visually verify that it was set correctly.

Listing 27. Setting the path
bash-3.2$ source ldlibrarypath.sh 
bash-3.2$ echo $LD_LIBRARY_PATH
/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2:/homes/hny1/koranda/IBM
/SPSS/ModelerSolutionPublisher64P/14.2/ext/bin/pasw.adp:/homes/hny1/koranda/IBM/SPSS/
ModelerSolutionPublisher64P/14.2/ext/bin/pasw.alm:/homes/hny1/koranda/IBM/SPSS/Modeler
SolutionPublisher64P/14.2/ext/bin/pasw.bagging:/homes/hny1/koranda/IBM/SPSS/ModelerSo
lutionPublisher64P/14.2/ext/bin/pasw.boosting:/homes/hny1/koranda/IBM/SPSS/ModelerSolu
tionPublisher64P/14.2/ext/bin/pasw.cognos:/homes/hny1/koranda/IBM/SPSS/ModelerSolutio
nPublisher64P/14.2/ext/bin/pasw.common:/homes/hny1/koranda/IBM/SPSS/ModelerSolution
Publisher64P/14.2/ext/bin/pasw.me:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublish
er64P/14.2/ext/bin/pasw.netezzaindb:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPubl
isher64P/14.2/ext/bin/paswneuralnet:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPubli
sher64P/14.2/ext/bin/pasw.outerpartition:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionP
ublisher64P/14.2/ext/bin/pasw.pmmlmerge:/homes/hny1/koranda/IBM/SPSS/ModelerSolutio
nPublisher64P/14.2/ext/bin/pasw.psm:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPub
lisher64P/14.2/ext/bin/pasw.scoring:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPubli
sher64P/14.2/ext/bin/pasw.split:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher
64P/14.2/ext/bin/pasw.transformation:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPub
lisher64P/14.2/ext/bin/pasw.tree:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher
64P/14.2/ext/bin/pasw.vi:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.
2/ext/bin/pasw.xmldata:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2
/ext/bin/spss.bayesiannetwork:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher
64P/14.2/ext/bin/spss.binning:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher6
4P/14.2/ext/bin/spss.C5:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.
2/ext/bin/spss.inlinecsp:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.
2/ext/bin/spss.knn:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2/ex
t/bin/spss.modelaccreditation:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher6
4P/14.2/ext/bin/spss.modelevaluation:/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPub
lisher64P/14.2/ext/bin/spss.predictoreffectiveness:/homes/hny1/koranda/IBM/SPSS/Model
erSolutionPublisher64P/14.2/ext/bin/spss.predictorstat:/homes/hny1/koranda/IBM/SPSS/M
odelerSolutionPublisher64P/14.2/ext/bin/spss.propensitymodelling:/homes/hny1/koranda/I
BM/SPSS/ModelerSolutionPublisher64P/14.2/ext/bin/spss.psmmodel:/homes/hny1/koranda
/IBM/SPSS/ModelerSolutionPublisher64P/14.2/ext/bin/spss.selflearning:/homes/hny1/kora
nda/IBM/SPSS/ModelerSolutionPublisher64P/14.2/ext/bin/spss.svm:/homes/hny1/koranda
/IBM/SPSS/ModelerSolutionPublisher64P/14.2/ext/bin/spss.xd:/homes/hny1/koranda/IBM/S
PSS/ModelerSolutionPublisher64P/14.2/jre/bin/classic:/homes/hny1/koranda/IBM/SPSS/Mo
delerSolutionPublisher64P/14.2/jre/bin
bash-3.2$

Next, change directory to the data directory under baskrule and execute the stand-alone program as shown in Listing 28.

Listing 28. Executing the stand-alone program
bash-3.2$ cd data/
bash-3.2$ ../output/bin/standalone

You should see a number of informational messages showing the processing similar to the following. Note that the timestamp, process, class, method, and line information have been deleted for brevity and readability. For example, 09 Aug 2011 17:11:07.181 [21715] INFO spl_pe M[PEImpl.cpp:process:483] - Start processing....

Listing 29. Informational messages
bash-3.2$ ../output/bin/standalone
- Start processing...
- About to clemrtl initialise using SP_Install of: 
        "/homes/hny1/koranda/IBM/SPSS/ModelerSolutionPublisher64P/14.2"
- After clemrtl initialise
- Major=14 Minor=2 Release=0 build=0
- Image Handle Retrieved: 1
- About to get field count
- Field Count is: 2
- About to get Field Types
- Field Type 0 is: STRING
- Field Type 1 is: LONG
- About to get Output Field Count
- Output Field Count is: 4
- About to get output field types
- Field Type 0 is: STRING
- Field Type 1 is: LONG
- Field Type 2 is: STRING
- Field Type 3 is: DOUBLE
- About to set alternative Input
- After Set Alternative input
- About to set alternative output
- After Set Alternative Output
- About to prepare
- After Prepare
- Leaving Constructor
- Opening ports...
- Opened all ports...
- Creating 1 operator threads
- Created 1 operator threads
- Processing tuple from input port 0 {s_sex="F",s_income=10000}
- Joining operator threads...
- About to execute the image
 - In next_record iterator
- In next_record iterator
- After Execute
- Sending tuple to output port 0 
        {s_sex="F",s_income=10000,predLabel="F",confidence=0.988327}
- Processing tuple from input port 0 {s_sex="F",s_income=20000}
- About to execute the image
- In next_record iterator
- In next_record iterator
- After Execute
- Sending tuple to output port 0 
        {s_sex="F",s_income=20000,predLabel="F",confidence=0.989645}
...
      similar lines for other input tuples omitted
...
- Joined all operator threads...
- Joining window threads...
- Joined all window threads.
- Joining active queues...
- Joined active queues.
- Closing ports...
- Closed all ports...
- Notifying operators of termination...
- About to Close Image with handle1
- After Close Image
- Notified all operators of termination...
- Flushing operator profile metrics...
- Flushed all operator profile metrics...
- Deleting active queues...
- Deleted active queues.
- Deleting input port tuple cache...
- Deleted input port tuple cache.
- Deleting all operators...
- Deleted all operators.
- Terminating...
- Completed the standalone application processing
- Leaving MyApplication::run()
- Shutdown request received by PE...
- shutdownRequested set to true...

To see the results, see the mpkoutput.csv file created by the FileSink.

Listing 30. mpkoutput.csv
bash-3.2$ cat mpkoutput.csv 
"F",10000,"F",0.988326848249027
"F",20000,"F",0.989645351835357
"F",15000,"F",0.988326848249027
"F",25000,"F",0.989645351835357
"F",10000,"F",0.988326848249027
"F",20000,"F",0.989645351835357
"F",15000,"F",0.988326848249027
"F",25000,"F",0.989645351835357
"M",10000,"T",0.838323353293413
"M",20000,"F",0.990963855421687
"M",15000,"T",0.838323353293413
"M",25000,"F",0.990963855421687
"M",10000,"T",0.838323353293413
"M",20000,"F",0.990963855421687
"M",15000,"T",0.838323353293413
"M",25000,"F",0.990963855421687
bash-3.2$

Hooray!

You have seen how to write an operator to invoke SPSS models from a Streams application. The next logical step would be for you to modify the sample adapter to work for your own specific model and data.


Using a different model

Adjusting for a different model

To adjust this sample operator to work for a different input tuple/model combination, you need to adjust it in the following spots:

  1. _h.cgt file — Adjust the output structure
  2. _cpp.cgt file ...
    1. next_record_back ...
      1. Adjust the output structure
    2. constructor ...
      1. Adjust the .pim and .par file name and locations
      2. Adjust the input and output file tags
      3. Adjust the input field pointer array size
    3. process ...
      1. Adjust load input structure code
      2. Adjust load output tuple code

The sample code provided has annotated those sections that may need to be adjusted. See Listing 31 for an example.

Listing 31. Sections that may need to be adjusted
/***************************************************************
* the following needs to be adjusted to match the pim and par  *
* file locations.  Unqualified gets from data directory.       *
***************************************************************/

     res = clemrtl_openImage("baskrule.pim","baskrule.par", &image_handle);

Type matching

An important aspect necessary to support successful integration is ensuring that the tuple attributes are compatible with the storage required by the model fields.

In defining models in SPSS Modeler, the Solution Publisher documentation describes the mapping from Modeler types to their typical C declarator. For example, the modeler type STRING is interpreted as a UTF-8 null-terminated character string and declared as a const char*, while the modeler type LONG is interpreted as a 64-bit signed integer and declared as long long.

The InfoSphere Streams documentation provides a table that describes how to map from SPL types to their C++ equivalent. For example, and SPL rstring maps to a SPL::rstring that has a C++ base implementation of std::string and a SPL int64 maps to a SPL::int64 that has a C++ base implementation of int64_t.

Following is a table describing the total mapping from Modeler types to Streams SPL types.

Table 1. Type mapping
Modeler typeXML metadata tagSP field type (returned from PIM)C declaration (from SP documentation)SPL type
StringstringSTRINGconst char *rstring
IntegerintegerLONGlong longint64
RealrealDOUBLEdoublefloat64
TimetimeTIMElong longint64
DatedateDATElong longint64
TimestamptimestampTIMESTAMPlong longint64

Summary

Results

This tutorial has shown how you can wrap the execution of any SPSS Modeler predictive analytic (that is compatible with the Solution Publisher API restrictions and makes sense to execute a tuple at a time). It has also provided insight as to how the sample operator provided could be adjusted to handle different models.

Note that there are other ways to execute scoring models in InfoSphere Streams through PMML and the Streams Mining toolkit. The direct wrapping technique and integration with SPSS models through the Solution Publisher interface provided here opens the scoring up to a much larger set of models than what are supported through the PMML integrations of the Mining Toolkit.

Future possibilities

Possibilities for extending this work:

  • Producing a generic operator with mapping code driven off parameters and the XML metadata allowing for models to be incorporated without the need to modify the operator C++ template code (see Part 2 of this series).
  • Providing customizable error behavior when execution fails or data is malformed.
  • Providing better support for dynamically updating the model. You could just stop and restart the operator, causing it to reload the .pim and .par files, but something akin to the way the modeling toolkit has an optional input port to feed new models and have the operator manage the replacement of the model.

Download

DescriptionNameSize
Sample streams operator, program, models and databaskrule.zip25KB

Resources

Learn

Get products and technologies

  • Build your next development project with IBM trial software, available for download directly from developerWorks.

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Information management on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Information Management, Big data and analytics
ArticleID=761517
ArticleTitle=Integrating SPSS Model Scoring in InfoSphere Streams, Part 1: Calling Solution Publisher from an InfoSphere Streams operator
publish-date=10132011