Calling Python code from IBM InfoSphere Streams

The Python programming language is a popular choice among enterprise developers to quickly put together working solutions. Many companies adopt Python to build IT assets for regular use. IBM InfoSphere® Streams is a middleware product designed for implementing logic directly in C++ and Java™ technology. Read on to learn how to call Python code directly from IBM InfoSphere Streams applications.


Senthil Nathan (, Senior Technical Staff Member, IBM TJ Watson Research Center

Senthil NathanSenthil Nathan is a senior technical staff member at the IBM Thomas J. Watson Research Center in Yorktown Heights, New York. He has 28 years of experience in building software for different technology areas, including SOA, Web services, Java Enterprise Edition, PHP, Web 2.0, and Ajax. At the time of writing this article, his full job focus is on the InfoSphere Streams product and its commercial applications.

Glenn Hochberg (, Principal Member of Technical Staff, AT&T

Glenn HochbergGlenn Hochberg is a principal member of technical staff at AT&T's Chief Security Office in Florham Park, N.J. He has developed software using a variety of languages and technologies, including communication protocols in assembler, transaction servers in C++, VoIP and IVR applications in Java and VXML, and data analysis tools in Python, for almost 35 years. He is currently the lead developer on AT&T's Internet Data Security Analysis platform (Flood).

15 April 2013 (First published 05 April 2013)

Also available in Chinese Russian


IBM InfoSphere Streams is high-performance real-time event processing middleware. Its unique strength lies in its ability to ingest structured and unstructured data from a variety of data sources for performing real-time analytics. It does this through a combination of an easy-to-use application development language called Streams Processing Language (SPL) and a distributed runtime platform. This middleware also provides a flexible application development framework to integrate code written in C++ and Java into Streams applications. In addition to C++ and Java, many developers who build real-world IT assets also use dynamic programming languages. With its strength in system integration capabilities, Python is a viable option for many companies to quickly build solutions. For those with existing assets written in Python, there is a way to integrate Python code inside Streams applications. This article explains the details about doing that through a simple Streams application example.

This article assumes familiarity with InfoSphere Streams and its SPL programming model. Working knowledge in C++ and Python is also needed to understand the programming techniques. For in-depth details about InfoSphere Streams and Python, see Resources.

InfoSphere Streams is a key component in IBM's big data platform strategy. Many of IBM's current and prospective customers with Python assets and skills can take advantage by mixing it with InfoSphere Streams. This article is targeted at readers whose technical focus is big data applications, including application designers, developers, and architects.

Example scenario

In order to explain the nitty-gritty technical details involved in calling Python code from a Streams application, we will stick to a simple example. This scenario involves reading the names of a few web addresses from an input CSV file and calling a simple user-written Python function that will return the following details as its result. We will then write the result for each web address into a separate output CSV file:

  • Primary hostname of the URL
  • List of alternate hostnames for the URL
  • List of IP addresses for the URL
  • Company name specified in the URL string


Code snippets used below explain the implementation details for the scenario explained above. This example code can also be downloaded so you can run it on your own IBM InfoSphere Streams installation. The example code was tested in the following environment:

  • RedHat Enterprise Linux 6.1 or above (or an equivalent CentOS version)
  • gcc version 4.4.5 20110214 (Red Hat 4.4.5-6) (GCC)
  • Python 2.6.6 (r266:84292, Apr 11 2011, 15:50:32, shipped with RHEL6)
  • /usr/lib/
  • /usr/include/python2.6 directory with Python.h and other include files
  • IBM InfoSphere Streams 3.x configured with a working Streams instance

The same techniques could work in slightly different environments (e.g., RHEL 5.8 and Streams with some tweaks to the code or environment setup.

High-level application components

In our simple example scenario, there are three major components. Each component is independent enough to be in its own project because of the natural separation by the programming language used in each of them:

  • UrlToIpAddress Python script
  • StreamsToPythonLib C++ project
  • streams-to-python SPL project

UrlToIpAddress is a Python script with simple logic that uses Python APIs to get IP address and hostname information for a given web address. This script can be tested independently using the Python interpreter. This tiny script plays a major part in this article in demonstrating how to call functions in a Python script from a Streams application.

StreamsToPythonLib is a C++ project. Inside of it, source code for the SPL native function logic is included. Primarily, source code here uses the Python/C API to embed Python code during the execution of C++ code. Embedding Python in C++ code is well described as part of the Python documentation. This project contains a Wrapper include (.h) file, which is an important one and this file provides an entry point for a Streams SPL application to call any C++ class method. All the C++ logic in this project will be compiled into a shared object library (.so) file and made available to the SPL application.

streams-to-python is a Streams SPL project. Inside of it, we have a basic SPL flow graph to make a call chain (SPL<-->C++<-->Python). This SPL code reads URLs from an input file in the data directory, calls the C++ native function to execute the Python code, receives the results, and writes it to an output file in the data directory. Inside the SPL project directory, a native function model XML file outlines the meta information needed to directly call a C++ class method from SPL. This detail covers the C++ wrapper include file name, C++ namespace containing the wrapper functions, C++ wrapper function prototype expressed using SPL syntax/types, name of the shared object library created from the C++ project, location of the shared object library, location of the wrapper include file, etc.

In the following sections, we will dive deep into each of these three application components and explain the Python, C++, and SPL code in a detailed manner.

Python logic

Listing 1 shows the Python code. This is the business logic we want to call from Streams.

Listing 1.
import re, sys, socket

def getCompanyNameFromUrl(url):
    # Do a regex match to get just the company/business part in the URL.
    # Example: In "", it will return "ibm".
    escapedUrl = re.escape(url)
    m = re.match(r'www\.(.*)\..{3}', url)
    x =
    return (x)

def getIpAddressFromUrl(url):
    # The following python API will return a triple
    # (hostname, aliaslist, ipaddrlist)
    # hostname is the primary host name for the given URL
    # aliaslist is a (possibly empty) list of alternative host names for the same URL
    # ipaddrlist is a list of IPv4 addresses for the same interface on the same host
    # aliaslist and ipaddrlist may have multiple values separated by
    # comma. We will remove such comma characters in those two lists.
    # Then, return back to the caller with the three comma separated
    # fields inside a string. This can be done using the Python 
    # list comprehension.
    return(",".join([str(i).replace(",", "") for i in socket.gethostbyname_ex(url)])) 

if ((__name__ == "__main__") and (len(sys.argv) >= 2)):
    url = sys.argv[1]
    # print("url=%s" % (url, ))
    print "IP address of %s=%s" % (url, getIpAddressFromUrl(url))
    print "Company name in the URL=%s" % repr(getCompanyNameFromUrl(url))
elif ((__name__ == "__main__") and (len(sys.argv) < 2)):
    sys.exit("Usage: python")

It is evident from Listing 1 that the Python code is deliberately kept simple for clarity. This has two Python functions followed by a code snippet that will run when the Python script is executed using a Python interpreter. To verify that the code works as expected, this script can be run from a shell window: python

At the top of the file, Python modules, such as regular expression and socket, are imported. The first function is getCompanyNameFromUrl, which takes a web address as input. It does a regular expression match to parse the company name from the web address and returns the company name to the caller. Next function is getIpAddressFromURL. It also takes a web address as input. It calls a Python socket API to get the IP address of the given web address. In particular, this Python API (gethostbyname) returns a tuple with three elements in it. These three elements provide hostname of the server for the given web address, alternate hostnames if any, and one or more IP addresses for that server. Instead of returning the tuple type to the caller, this function flattens the three tuple elements into a Python string by inserting a comma after each element. Then it returns the result as a string to the caller.

The purpose of this example is to learn about calling those two Python script functions from within a Streams application. We will focus on that in the following sections.

C++ logic

InfoSphere Streams allows for the inclusion of code written in C++ in two ways. One way is to build primitive Streams operators in C++, thereby incorporating the business logic written in C++. The other option is to execute any arbitrary C++ class methods directly from SPL as native functions. In this exercise, we will use the native function approach. To do that, we will create a separate C++ project named StreamsToPythonLib, in which we will write the necessary code to call the Python functions we covered in the previous section. Then we will create a shared object (.so) library to make this C++ code available to the Streams SPL application.

Table 1 shows the contents of the StreamToPythonLib C++ project directory.

Table 1. StreamsToPythonLib C++ project directory
StreamsToPython.hC++ class interface file
StreamsToPython.cppC++ class implementation file
StreamsToPythonWrappers.hC++ include file containing the Streams native function code
mkA script to build the shared object (.so) library of this C++ project
Listing 2. StreamsToPython.h

using namespace std;

// To avoid a redefinition compiler error, undefine the following.
// This should be the first include file (according to Python documentation)
#include "Python.h"

// Include files that defines SPL types and functions.
#include "SPL/Runtime/Function/SPLFunctions.h"
#include <SPL/Runtime/Utility/Mutex.h>
#include <SPL/Runtime/Type/ValueHandle.h>
// Include standard C++ include files.
#include <sys/time.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <sstream>

// This will allow us to access the types and functions from SPL namespace.
using namespace SPL;

// Your #define constant definitions go here.

// Class definition follows here.
namespace calling_python_from_streams {
   class GlobalStreamsToPythonSession {
         // This member variable tells us if a global
         // streams to Python caller handle already
         // exists for a given PE/process.
         boolean streamsToPythonHandleExists;

         // Following member variables are required for
         // calling Python C APIs.
         static boolean pyInitialized;
         static boolean importFailed;
         PyObject* pFunc1;
         PyObject* pFunc2;

         virtual ~GlobalStreamsToPythonSession();

         // This method establishes StreamsToPython handle for a given PE/process.
         int32 initializeStreamsToPython();
         // This method gets the IP address of a given URL.
         boolean getIpAddressFromUrl(rstring const & url,
            rstring & primaryHostName, rstring & alternateHostNames,
            rstring & ipAddressList, rstring & companyName);

         // Get the global (Singleton) Streams to Python session object.
         static GlobalStreamsToPythonSession & getGlobalStreamsToPythonSession();

#endif /* STREAMS_TO_PYTHON_H_ */

Listing 2 shows that it is a C++ interface class. It starts off with an inclusion of Python.h, which is a must for our task of calling into the native Python code. It includes standard library header files along with SPL include files. It is important to note that by including SPL header files and by using the SPL namespace, we can access SPL data types inside C++. Many of the primitive and collection data types in SPL are representations of equivalent C++ built-in data types. Inside the namespace and class sections, member variables and member methods are declared. There are a few Python object related member variables, which we will cover later. There are prototypes declared for class constructor, destructor, and the business logic method that will get called from SPL. In the end, there is a static method, getGlobalStreamsToPythonSession, that provides a singleton access to this C++ class from the SPL code. We will see more details about all of this shortly.

Listing 3. StreamsToPython.cpp
#include "StreamsToPython.h"
#include <dlfcn.h>

namespace calling_python_from_streams {
   // Initialize the static member variables in this class.
   boolean GlobalStreamsToPythonSession::pyInitialized = false;
   boolean GlobalStreamsToPythonSession::importFailed = false;

   // Define a constructor for the global streams to python session class.
   GlobalStreamsToPythonSession::GlobalStreamsToPythonSession() {
      // No streams to python session handle exists at this time.
      streamsToPythonHandleExists = false;
      pFunc1 = NULL;
      pFunc2 = NULL;

      // Let us load the python shared library.
      // This was tested using Python 2.6 that shipped with RHEL6.1.
      // If you are testing it on a different version of RHEL,
      // please load the correct version of python below.
      SPLAPPTRC(L_ERROR, "Calling dlopen", "STREAMS_TO_PYTHON");
      void* handle = dlopen("", RTLD_NOW|RTLD_GLOBAL);

      if (handle == false) {
         SPLAPPTRC(L_ERROR, "dlopen failed " << dlerror(), "STREAMS_TO_PYTHON");

      // Per PE/process, we want to call the python initialize API only once.
      if ((handle != false) && (pyInitialized == false)) {
         // We expect the actual Python script we will execute below to be
         // available in the Streams application's data directory.
         // That data directory is the current working directory for the
         // Streams application. Let us add the current working directory
         // to the PYTHONPATH environment variable.
         // Without doing this, we will not be able to load the
         // Python script later using the PyImport_Import API.
         setenv("PYTHONPATH", ".", 1);
         pyInitialized = true;

   // Define a destructor method to do any cleanup.
   GlobalStreamsToPythonSession::~GlobalStreamsToPythonSession() {
      if (streamsToPythonHandleExists == true) {
         streamsToPythonHandleExists = false;

      // We can decrement the reference count for the Python functions.
      if (pFunc1 != NULL) {

      if (pFunc2 != NULL) {

   // Define a method that creates and returns a singleton access to
   // our global StreamsToPython session object. This will guarantee us that there will
   // always be only one instance of this class per PE.
   GlobalStreamsToPythonSession & 
      GlobalStreamsToPythonSession::getGlobalStreamsToPythonSession() {
      // A static variable that will get initialized only once per process.
      static GlobalStreamsToPythonSession gstps;
      // Return our singleton global streams to python session object.
      return gstps;

   // This method initializes the streams to python native function module.
   // It is done only once per PE/process. (This is an optional method that
   // a Streams PE can call to do any application-specific initialization of
   // state information stored in this C++ class.)
   int32 GlobalStreamsToPythonSession::initializeStreamsToPython() {
      SPLAPPTRC(L_ERROR, "initializeStreamsToPython request received from a thread id "
         << pthread_self() << " within the process id " <<
         getpid() << ".", "STREAMS_TO_PYTHON");

      // If we already have established a streams to python handle for this 
      // PE/process, then return now.
      if (streamsToPythonHandleExists == true) {
          SPLAPPTRC(L_ERROR, "A global and a shared StreamsToPython handle has " <<
          "already been made by another thread in the same process id.", 
          return (0);

      SPLAPPTRC(L_ERROR, "This global StreamsToPython handle can now be shared " <<
         "by multiple threads within the process id " <<	getpid() << ".", 

      streamsToPythonHandleExists = true;
      return (0);

   // This is the C++ native function method that can be called from the 
   // SPL application code to invoke Python functions for getting
   // IP address details of a given URL.
   boolean GlobalStreamsToPythonSession::getIpAddressFromUrl(
      rstring const & url,
      rstring & primaryHostName, rstring & alternateHostNames,
      rstring & ipAddressList, rstring & companyName) {
      // We must first get a pointer to the Python functions we want to call.
      // If an earlier attempt to access the python functions failed, we can't proceed.
      if (importFailed == true) {

      // For the very first time we are here, let us get a pointer to
      // the first python function.
      if (pFunc1 == NULL) {
         PyObject *pName, *pModule;
         pName = pModule = NULL;
         // Get a python string for the name of our Python module
         // (i.e. Python source file name)
         pName = PyString_FromString("UrlToIpAddress");
         if (pName == NULL) {
            SPLAPPTRC(L_ERROR, "Failed to convert UrlToIpAddress to python string.",

            if (PyErr_Occurred()) {

            importFailed = true;

         // Using the python string carrying the python file name,
         // let us import the module now.
         pModule = PyImport_Import(pName);
         // We can release pName since we don't need it anymore.

         if (pModule == NULL) {
            SPLAPPTRC(L_ERROR, "Failed to import UrlToIpAddress python module.",

            if (PyErr_Occurred()) {

            importFailed = true;

         // We have to check if this python module has our 
         // first function getIpAddressFromUrl.
         if (PyObject_HasAttrString(pModule, "getIpAddressFromUrl") == 0) {
            SPLAPPTRC(L_ERROR, "Module doesn't have the getIpAddressFromUrl function.",

            if (PyErr_Occurred()) {

            importFailed = true;
            // Decrement the reference count for the module.

         // Let us get a pointer to the python function we want to call.
         pFunc1 = PyObject_GetAttrString(pModule, "getIpAddressFromUrl");

         if (pFunc1 == NULL) {
            SPLAPPTRC(L_ERROR, "Unable to get the getIpAddressFromUrl function " 
               << "from the python module.", "STREAMS_TO_PYTHON");

            if (PyErr_Occurred()) {

            importFailed = true;
            // Decrement the reference count for the module.

         // Now that we have a pointer to the python function, let us verify that
         // we can indeed make a call to that function.
         if (PyCallable_Check(pFunc1) == 0) {
            SPLAPPTRC(L_ERROR, "Python function getIpAddressFromUrl is not callable.", 

            if (PyErr_Occurred()) {

            importFailed = true;
            // Decrement the reference count for the module.

         // Let us get a pointer to our second python function as well now.
         // We have to check if this python module has our 
         // second function getCompanyNameFromUrl.
         if (PyObject_HasAttrString(pModule, "getCompanyNameFromUrl") == 0) {
            SPLAPPTRC(L_ERROR, "Module doesn't have the getCompanyNameFromUrl function.",

            if (PyErr_Occurred()) {

            importFailed = true;
            // Decrement the reference count for the module.

         // Let us get a pointer to the second python function we want to call.
         pFunc2 = PyObject_GetAttrString(pModule, "getCompanyNameFromUrl");

         if (pFunc2 == NULL) {
            SPLAPPTRC(L_ERROR, "Unable to get the getCompanyNameFromUrl function " 
               << "from the python module.", "STREAMS_TO_PYTHON");

            if (PyErr_Occurred()) {

            importFailed = true;
            // Decrement the reference count for the module.

         // Now that we have a pointer to the second python function, let us verify that
         // we can indeed make a call to that function.
         if (PyCallable_Check(pFunc2) == 0) {
            SPLAPPTRC(L_ERROR, "Python function getCompanyNameFromUrl is not callable.",

            if (PyErr_Occurred()) {

            importFailed = true;
            // Decrement the reference count for the module.

         // Everything went well.
         // We will keep the references to our python functions for making future calls.
         // We can dereference the pointer to the module.

      // We can now call the python functions.
      rstring resultFromPythonFunction1 = "";
      rstring resultFromPythonFunction2 = "";
      PyObject *pValue = NULL;
      pValue = PyObject_CallFunction(pFunc1, (char*) "s", url.c_str());

      // pFunc1 returns a tuple with three items in it.
      if (pValue != NULL) {
         const char* pStr = PyString_AsString(pValue);

         // Decrement the reference count for pValue.

         if (pStr != NULL) {
            resultFromPythonFunction1 = pStr;
         } else {
            if (PyErr_Occurred()) {

      } else {
         SPLAPPTRC(L_ERROR, "Python function call to getIpAddressFromUrl failed.",

         if (PyErr_Occurred()) {


      // Let us call the 2nd python function.
      pValue = PyObject_CallFunction(pFunc2, (char*) "s", url.c_str());

      if (pValue != NULL) {
         const char* pStr = PyString_AsString(pValue);

         // Decrement the reference count for pValue.

         if (pStr != NULL) {
            resultFromPythonFunction2 = pStr;
         } else {
      } else {
         SPLAPPTRC(L_ERROR, "Python function call to getCompanyNameFromUrl failed.",

         if (PyErr_Occurred()) {


      // We have successfully called two python functions.
      // We have to parse the result strings from those function calls and
      // assign the results to the references passed to this C++ method.
      // Result from the first python call is a triple containing 
      // three parts separated by commas.
      // Part1: Primary host name.
      // Part2: Zero or more alternative hostnames returned in a list
      // i.e. ['host1', 'host2', 'host3']
      // Part3: IP Address list. i.e. ['' '' '']
      SPL::list<rstring> tokens = 

      if (SPL::Functions::Collections::size(tokens) < 3) {
         SPLAPPTRC(L_ERROR, "Python function getIpAddressFromUrl didn't return " 
            << "correct results.", "STREAMS_TO_PYTHON");

      // First element in the list is the primary hostname.
      primaryHostName = tokens[0];
      // Alternate hostnames will be in the rest of the list except the very last element.
      alternateHostNames = tokens[1];
      // IP addresses will be in the very last element of the list.
      ipAddressList = tokens[SPL::Functions::Collections::size(tokens)-1];
      companyName = resultFromPythonFunction2;

Listing 3 is the implementation class. It opens with include statements for the corresponding interface class and the dynamic library loader. Python allows extension and embedding. Inside the Python code, one can extend it to invoke C functions. Similarly, inside C++ code, one can embed Python code. The main focus in Listing 3 is to use Python/C API to invoke native Python code. Our implementation class has five C++ methods. The following commentary takes a deeper look into each of those methods.

Constructor:— The following three major tasks are done when this class is instantiated:

  1. Set the Python path to the current directory.
  2. Initialize the Python interpreter as it must be done before using any Python/C API functions.
  3. Load the libPython shared library dynamically into our process space. Even though the Python shared library would get loaded automatically by the dynamic loader, we have to load it via dlopen so our Python script can link properly with other Python modules implemented as shared object libraries.

Destructor: The following cleanup activities are done when this class object goes out of scope:

  1. Reset the member variable that holds the handle needed for singleton class access.
  2. Clear the handles obtained for both of our Python functions.

getGlobalStreamsToPythonSession: It can be seen from Listing 2 that this method is declared as a static method. This is the entry point into this class when a Streams native function is called. Since we want to have only one instance of this C++ class per Streams processing element (PE), it is necessary to maintain a singleton object of this C++ class. Hence, when this particular method is called, a static object of this class is created and returned to the caller. That is how a Streams application can get a static handle to a C++ object and can arbitrarily call any C++ class method using the static handle.

initializeStreamsToPython: Since we maintain a singleton object of this C++ class per process, it is possible for this class to maintain state variables that can be used and shared across multiple invocations of the methods here. Even though this particular application doesn't store state, this is an important design aspect to keep in mind. A Streams application that uses C++ native functions can use such a method to initialize the state variables. Opening a database connection and storing the connection handle for subsequent database access is a good use of this approach. The application described in this article simply ensures that only the very first call made to this C++ method initializes the global handle indicating that the singleton object of this class has been created.

getIpAddressFromUrl: This is a much longer method in this C++ class and it contains the business logic necessary to call a Python function and fetch return values. The Python framework provides a comprehensive set of C APIs to embed Python code within a C or C++ application. Having initialized the Python interpreter in the constructor method using Py_Initialize, we can use the other Python/C APIs in this method. Callers of this method will pass a web address as a method argument (e.g.,; note that the http:// part of a URL should not be included). This method also accepts four other string references as arguments, in which the result will be returned to the caller. Since we are using the SPL namespace in this C++ class, we are allowed to access SPL data types such as rstring, uint32, list etc. Many of the SPL data types are derived from C++ data types such as std::string, int, vector, etc.

The very first task in this C++ method is to get valid pointers to the two native Python functions we want to call. When this method is called for the first time, we want to get pointers to the two Python functions and store them in the member variables pFunc1 and pFunc2. That will allow us to reuse them in subsequent calls. In order to get pointers to the Python functions, we must first import the Python module containing those two functions. A Python module in this case is nothing but the filename of the Python script minus the .py extension. We have to use PyString_FromString to get a Python string object from a C++ string object holding the Python module name. Then, a call to PyImport_Import will get a handle to our Python module. On an error from any of the Python/C APIs, we will set a member variable called importFailed and return from this method. Subsequent calls to this C++ method will proceed only if importing the Python module succeeded earlier. Such Python/C API errors can be detected and logged using PyErr_Occurred and PyErr_Print APIs. It is also time now to introduce SPLAPPTRC, which is an SPL C++ Macro API that allows us to log application debug or trace information into the Streams logging system. It takes three arguments: log level, a C++ string object containing the log message, and an aspect that can be used for application-specific log filtering.

Having imported our UrlToIpAddress Python module, we now check that the functions we want to call really exist inside the Python module using PyObject_HasAttrString API by passing the Python function name. After validating the availability of the Python functions inside the Python module, we can get a pointer to that function by using the PyObject_GetAttrString API. Once we have a valid pointer to the Python function, it is necessary to check if that is indeed callable by using the PyCallable_Check API. After performing these steps successfully, our two C++ member variables (pFunc1 and pFunc2) will point to valid and callable user written Python functions. Now, we can call PyObject_CallFunction API to execute the function by passing the pFunc1 or pFunc2 member variable along with a list of expected function arguments. In our case, we pass a string (web address) as an argument to the Python function. Hence, the second argument is s to indicate that the argument is in string format, and the third argument is the actual web address represented as a regular C string. Since both of our functions return string as a result, we use the PyString_AsString API to convert the returned Python string object to a regular C string. We store the result strings from both the Python functions into our own rstring local variables. As explained in Example scenario, our first Python function returns the result as a string with three comma-separated parts. To parse the CSV fields, we can call standard SPL toolkit function named csvTokenize and assign the returned values directly to the C++ method argument references passed by the caller. That is what is involved in calling Python functions from C++.

In this C++ implementation class, there are two other important things to highlight. When we used the PyImport_Import API to import our module, how does it know the physical location of the Python script file? If we can refer back to the C++ constructor, there is a call made to a standard POSIX API that sets the PYTHONPATH environment variable to current directory via a period character. That is a key reason why the PyImport_Import API is able to locate the Python script and import it. In a Streams application, current working directory is always set to the /data subdirectory available within the SPL project directory. Hence, it is a must that our Python script is copied to the /data subdirectory. Otherwise, PyImport_Import API will not be able to locate and import our Python script. Another important thing to notice in this C++ implementation class is the liberal use of Py_DECREF API. All Python objects have a reference count that counts how many places there are that have a reference to an object. When a reference count becomes zero, that object is deallocated. In Python, reference counts are always manipulated explicitly. Hence, in our code, whenever we no longer need a valid Python object, we must make a call to the Py_DECREF API.

Listing 4. StreamsToPythonWrappers.h

// Include the file that contains the class definition.
#include "StreamsToPython.h"

namespace calling_python_from_streams {
   // Establish a handle to the StreamsToPython to be
   // accessed within a PE.
   inline int32 initializeStreamsToPython(void) {
      return GlobalStreamsToPythonSession::

   // Get the IP address of a given URL.
   inline boolean getIpAddressFromUrl(rstring const & url,
      rstring & primaryHostName, rstring & alternateHostNames,
      rstring & ipAddressList, rstring & companyName) {
      return GlobalStreamsToPythonSession::
         getIpAddressFromUrl(url, primaryHostName,
         alternateHostNames, ipAddressList, companyName);


Listing 4 is a Streams-specific extension file in the StreamsToPtyhonLib C++ project. As discussed earlier, in order for a Streams application to be able to call any method in a C++ class, we need to do something extra. And that extra work is done in this wrapper include file that contains inline functions. This file begins by including the C++ class interface file that we saw in Listing 2. These wrapper functions are defined within the same namespace scope as our actual C++ class in the StreamsToPythonLib project. A Streams application can call any of the inline functions specified in this wrapper include file. Every inline function gets a singleton object of the intended C++ class by calling the static method getGlobalStreamsToPythonSession. The first call made to this static method does a static instantiation of the C++ class. That static object reference is returned every time this static method is called. By getting a reference to the singleton object, a given inline wrapper function can now call any C++ method available in that object and pass any return values back to the Streams SPL application. This technique will come in handy in your other real-world Streams projects.

SPL logic

After learning about the Python and the C++ components used in this example, it is time for a basic Streams application that can tie everything together. We will write a short and sweet SPL application to have a flow graph with three Streams operators available readily in the SPL standard toolkit.

Table 2 shows the contents of the streams-to-python SPL project directory.

Table 2. streams-to-python SPL project directory
README.txtFile with a brief description of the entire application
python.wrapper.exampleSPL project directory
python.wrapper.example/streams_to_python.splSimple SPL file that invokes a C++ native function that in turn calls Python functions
python.wrapper.example/native.functionSPL native function directory
python.wrapper.example/native.function/function.xmlXML file containing the SPL native function model
dataSPL application's data directory
data/UrlInput.csvInput file containing test web addresses
data/UrlToIpAddress.pySimple Python script, whose functions will be called from Streams. It is placed here, because this is the current working directory for a Streams application
data/Expected-UrlToIpAddress-Result-Feb2013.csvCSV file containing expected results from this application as of Feb/2013
impl/libDirectory where the .so library built in the C++ project will be copied. Streams native function model file is configured to load .so from this directory
impl/includeDirectory where the include files from the above C++ project will be copied. Streams native function model file is configured to look here for the include files
build-standalone.shScript that will build a stand-alone Streams executable; doesn't need the Streams runtime
build-distributed.shScript that will build a distributed Streams executable; requires the Streams runtime
run-standalone.shScript that will run the stand-alone mode executable of this application
run-distributed.shScript that will run the distributed mode executable of this application
stop-streams-instance.shScript that will stop a specified Streams instance
Listing 5. streams_to_python.spl
namespace python.wrapper.example;

composite streams_to_python {
   // Define input and output schema for this application.
      InputSchema = tuple<rstring url>;
      OutputSchema = tuple<rstring url, rstring primaryHostName, 
         rstring alternateHostNames, rstring ipAddressList, rstring companyName>;
      // Read from an input file all the URLs for which we need to 
      // get the corresponding IP addresses.
      stream<InputSchema> UrlInput = FileSource() {
            file: "UrlInput.csv";
            initDelay: 4.0;

      // In the custom operator below, we will call python code to get the
      // primary host name, alternative host names, and IP addresses.
      stream<OutputSchema> IpAddressOfUrl = Custom(UrlInput) {
            onTuple UrlInput: {
               mutable rstring _primaryHostName = "";
               mutable rstring _alternateHostNames = "";
               mutable rstring _ipAddressList = "";
               mutable rstring _companyName = "";
               // Call the C++ native function that in turn will call Python functions.
               boolean result = getIpAddressFromUrl(UrlInput.url, _primaryHostName,
                  _alternateHostNames, _ipAddressList, _companyName);
               if (result == true) {
                  mutable OutputSchema _oTuple = {};
                  _oTuple.url = UrlInput.url;
                  _oTuple.primaryHostName = _primaryHostName;
                  _oTuple.alternateHostNames = _alternateHostNames;
                  _oTuple.ipAddressList = _ipAddressList;
                  _oTuple.companyName = _companyName;
                  submit(_oTuple, IpAddressOfUrl);
      // Write the results to a file using FileSink.
      () as FileWriter1 = FileSink(IpAddressOfUrl) {
            file: "UrlToIpAddress-Result.csv";

Listing 5 is the SPL flow graph that starts by defining a namespace. It is followed by a definition of an SPL main composite. In the types section, two tuple data types are defined for input and output of this application. Then, a basic graph clause is filled with three Streams operators available in the SPL standard toolkit. The first operator is a FileSource, which reads the rows from an input CSV file from the default location (the data subdirectory of the SPL project). Tuples emitted by the FileSource operator are consumed by a Custom operator, which calls an SPL native function (getIpAddressFromUrl) written in C++. As we saw, that C++ code in turn executes the Python functions to return the results for a given web address. Those result values are assigned to an output tuple and submitted from the Custom operator. Finally, a FileSink operator consumes the output tuples from the Custom operator and writes the results to an output CSV file. It is important to note that the C++ native function code is compiled into a shared object (.so) library as explained below.

Function model

Listing 6. function.xml
<?xml version="1.0" encoding="UTF-8"?>
<functionModel xmlns:xsi="" 
   " functionModel.xsd">
        <description>Initialize the Streams to Python module</description>
        <prototype>public int32 initializeStreamsToPython()</prototype>
        <description>Get the IP addresses for a given URL</description>
        <prototype>public boolean getIpAddressFromUrl(rstring url, 
        mutable rstring primaryHostName, mutable rstring alternateHostNames,
        mutable rstring ipAddressList, mutable rstring companyName)</prototype>
        <cmn:description>Streams to Python Shared Library</cmn:description>

Listing 6 is the native function model XML file. In Listing 5 for the SPL code, we saw a C++ native function being called inside the Custom operator. How does SPL code find out about the location of that C++ code? The native function model XML file is the glue between the SPL code and the C++ code. When compiling the SPL code, the Streams compiler resolves the C++ function name through the information we provide in this XML file. At the start of this XML file, we specify the name of the C++ wrapper include file that contains the inline native functions covered in Listing 4. Then we indicate the C++ namespace in which the inline C++ native functions are defined. That is followed by an XML segment, where we declare the prototype for the C++ native functions. It is important to note that the prototype declarations are specified using the SPL types that correspond to the C++ data types. If a C++ native function expects a function argument to be passed as a reference, that function argument should be declared mutable in the function prototype. If the C++ native function logic is made available via a shared object (.so) file, a library XML segment should be included. In that, we have to specify the library name. (The first three letters of a Linux library are typically 'lib' and those three letters should be omitted, while specifying the library name. Similarly, the .so extension is also not required.) The location of the .so file and the location of the include file for the shared library must be specified. It is good practice to bundle the shared object library file and its include files as part of the SPL project directory so that it is easy to ship them across different Streams installations.

As shown above in Table 2, the SPL project directory has the impl/lib and impl/include subdirectories suitable for this purpose. In the native function model XML file, it is indicated as ../../impl/lib and ../../impl/include (../../ is a relative path to the impl directory that can be resolved from the location of the function model XML file). If your application is supported on multiple versions of Linux® and on 32- and 64-bit CPUs, it is necessary to provide different versions of the libraries in separate directories. To make it easier to automate this, this example uses a shell script (../../impl/bin/archLevel) that will automatically select the correct library location based on the Linux version and CPU (32-bit vs. 64-bit). If you read the archLevel script, you will understand how that is done. Finally, we have a library section to indicate our dependency on by specifying its name, the location of the library, and its include files.

Building the example

This article includes the full source code for the example discussed here (see Downloads). A given Streams application can be compiled in two modes (stand-alone and distributed). In stand-alone mode, the entire SPL main composite is compiled into a single Linux executable. In the distributed mode, the SPL main composite is compiled as distributed components configured to run on one or more machines. If you have a test environment that meets the prerequisites, you can follow the instructions below to build the example:

  1. Obtain the file (see Downloads).
  2. Unzip the file to your home directory on your Linux machine that has Streams installed.
  3. Change directory to ~/workspace1/StreamsToPythonLib C++ project directory.
  4. You are going to create the .so shared library by running the ./mk script.
  5. The previous command creates and copies the .so file to ../../impl/lib/x86_64.RHEL6 directory and copies the include files to ../../impl/include.
  6. Change directory to ~/workspace1/streams-to-python SPL project directory.
  7. Create a stand-alone mode application by running the ./ script.
  8. Create a distributed mode application by running the ./ script.
  9. You should now see ~/workspace1/streams-to-python/output directory with stand-alone and distributed executables.

Running the example

A great feature in Streams allowed us to build both stand-alone and distributed applications without making changes to the source code. We can now run both as described below.

Stand-alone: This kind of Streams application is a single Linux executable that can be run without the need to start and stop a Streams runtime instance:

  1. Change directory to ~/workspace1/streams-to-python SPL project directory.
  2. Run the ./ script.
  3. Skip to the "Verifying the results" below.

Distributed: This kind of Streams application contains the Streams operators specified in the SPL flow graph compiled into many Processing Elements (PEs). These processing elements are distributed as individual Linux processes to make use of multiple CPU cores and a cluster of machines. In order to run a distributed mode Streams application, it is required to start a Streams instance, submit the application as a job on that Streams instance, collect the results, and stop the Streams instance:

  1. Ensure that you have already created a Streams instance.
  2. Change directory to ~/workspace1/streams-to-python SPL project directory.
  3. Run this script with a command-line argument: ./ -i YOUR_STREAMS_INSTANCE_NAME.
  4. You should give your Streams instance name as an argument to the script in the previous step.
  5. Since it is a very simple application, it will finish quickly. Wait 60 seconds.
  6. You can stop the Streams instance now by running this script: ./ -i YOUR_STREAMS_INSTANCE_NAME.
  7. Skip to the "Verifying the results" below.

Verifying the results: Regardless of whether you ran a stand-alone or a distributed application, our SPL program logic reads the web addresses from an input CSV file (data/UrlInput.csv) one line at a time. It calls the C++ native function to get network details about a given web address and writes the results into an output CSV file (data/UrlToIpAddress-Result.csv). Following are the web addresses already stored in the input CSV file of this example:


If the stand-alone/distributed application worked correctly, you should see the results in the data/UrlToIpAddress-Result.csv file. Your results should look similar to the ones packaged with this example from a test run made at the time of this writing (data/Expected-UrlToIpAddress-Result-Feb2013.csv). Expected results are as shown below. Result for a given web address contains five comma-separated fields with this format: WebAddress, PrimaryHostName, AlternateHostNames, IPAddresses, CompanyName.

"","","['' '']","['' '' '' '']","cnn"
"","","['' '']","['']","ieee"
"","","['' '' '' '']","['']","yahoo"


Python has evolved nicely over the past two decades. As a dynamic programming language, it has an avid group of followers from academia to world-renowned companies. Its ease of use and programmer productivity gains are often cited as the key reasons for its success among the other top languages, such as C++, PHP, and the Java programming language.

IBM InfoSphere Streams is a market-leading event-processing platform that offers superior capabilities for big data analytics. It packs a powerful, flexible, and extensible programming model via its Streams Processing Language (SPL) supporting out-of-the-box integration features with business logic written in C++ and the Java language.

This article focused on bringing together the best capabilities of two worlds (SPL and Python). It summed up a way for you to seamlessly mix analytics code written in Python in the Streams applications to take advantage of its unparalleled features in scaling and distributed processing. In addition to educating you about Streams+Python integration, this article introduced the mechanisms involved in calling any arbitrary methods in a C++ class directly from the SPL code.

In summary, we covered how to make a round-trip call chain between three languages (SPL<-->C++<-->Python). This article proved the concepts with fully working sample code (see Downloads). You can use the example to run as a stand-alone Linux application or as a distributed Streams application.


SPL, C++, and Pythonstreams-to-python.zip26KB



Get products and technologies



developerWorks: Sign in

Required fields are indicated with an asterisk (*).

Need an IBM ID?
Forgot your IBM ID?

Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name

The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.


All information submitted is secure.

Dig deeper into Big data and analytics on developerWorks

Zone=Big data and analytics, Information Management
ArticleTitle=Calling Python code from IBM InfoSphere Streams