Real-time data representation conversion in a distributed deployment using InfoSphere Streams native functions

Using the function overloading concepts of C++ with native functions of InfoSphere Streams

InfoSphere® Streams provides the capabilities for handling/reporting data or events as they happen in real time. But sometimes when running a distributed application on multiple platforms where data flows between machines having different processor architectures, the task can be challenging. Due to variations in processor architectures, the data representation may differ, requiring the application developer to convert the representation before applying the required logic. This is true especially if the data is in binary format. By performing data format conversion at real time, soon after data is read from source, more consistent and meaningful results can be obtained.

Share:

Bharath Kumar Devaraju (bhdevara@in.ibm.com), Software Engineer, IBM

Photograph of author BharathKumar DevarajuBharath Kumar Devaraju has worked with IBM since 2009 and is currently working on InfoSphere Streams toolkit development. He is a QualityStage and DataStage certified solution developer. He has worked extensively on customer POCs and assisted in pre-sale activities for growth markets.



Mohan Dani (mohadani@in.ibm.com), DAAS Cloud Computing Lead, InfoSphere Streams, Enterprise Data Quality Architect, IBM

Mohan Dani is currently leading the DAAS Cloud Computing efforts, InfoSphere Streams, and has more than 11 years of experience in IT. He has extensive knowledge of implementing large enterprise solutions and has served in multiple roles as an IBM business analyst, development lead, and solution architect, specializing in data quality. He also consults for IBM internal teams for pre-sale, post-sale and delivery excellence, and thought leadership on data quality.



20 September 2012

Also available in Chinese Portuguese

Introduction

When running a distributed real-time application, the challenge any application or solution developer encounters is the variation in data formats. The first step he should perform is conversion of data format before proceeding with analysis/processing. In a real-time solution, the data will be fed from various sources, which run on various architectures. When the data arrives from heterogeneous architectures in raw binary format, the results can be inconsistent or incorrect. IBM InfoSphere Streams offers native functions for creating custom logic as reusable functions, which can be called from a Streams Processing Language (SPL) application. In this article, we will describe steps for developing the data format conversion routines using native functions by exploiting function overloading concepts of C++. We then also will describe how to use these functions as part of a Streams solution with the help of InfoSphere Streams Studio. The high-level implementation architecture is shown below.

Figure 1. Real-time data format conversion
Image shows high-level data flow architecture of real-time data format conversion solution

Prerequisites

  • Business prerequisites: The audience of this article is required to have basic skills with designing and running SPL application jobs from InfoSphere Streams and intermediate level skills with C++ programming.
  • Software prerequisites: InfoSphere Streams 2.0 and higher.

Preparing the InfoSphere Streams Studio for native function development

SPL

SPL stands for Streams Processing Language. It is a distributed data flow composition language that supports rapid real-time application development. SPL is designed to provide control over the most performance-critical aspects of code. In particular, the programmer directly controls graph topology and data representations, and can choose to control the threading model, placement, physical layout, etc.

Following are the steps for creating the new native function.

  1. Create a SPL application from the Streams studio.
  2. Create a new native function definition by clicking on the project and choosing new SPL Native function model.
  3. For every new operator or native function in Streams, a new operator model is created where all the configuration needs to be specified. A snippet of the operator model is shown in Figure 2.
  4. Configure the operator model:
    • Specify the name of the header file containing the function logic.
    • Specify the prototypes of function, present in the header file.
    Figure 2. External library dependency
    Figure shows structure of operator model and its various properties
  5. The values that need to be set for the various sections in the operator model are in Table 1.
Table 1. Various properties of native function model to be set
SectionPropertyDescriptionValue
Function SetCpp Namespace NameMention the namespace name where C++ functions are definedDataConvertors
Header File NameMention the name of the header file where the C++ functions are defined. The header file needs to be present in the resources/impl folder inside the SPL application.DataConvertors.h
Function Set > Prototypes (right-click new child and function)Prototype > ValueSpecify the function prototype. Prototype needs to provided all the native functions in the function model.public void endian_swap(int64 x)

C++ function code to perform data format conversion and integrating it with native function model of InfoSphere Streams

By utilizing the function overloading concept of C++, a separate function can be written to process data from various types.

  • Based on the type the appropriate endian conversion function is invoked. A native function in SPL will be in the format as specified in Listing 1.
    • The include section is where all external references need to be included.
    • The namespace name for the function and the name should be the same as the one provided in the function model.
    • The function logic for all native functions.
Listing 1. Format of the native function definition in InfoSphere Streams
#include ..
                
namespace <namespace_name>
{
    ret_type swap_endian(parameters..)
    { 
    ...
    }
}

Below is the snippet for native function code (Listing 2) that performs endian conversion. Based on the type of parameter, the appropriate function is invoked.

Listing 2. Sample code shows the logic for endian conversion of input of type short, long and int
void swap_endian(unsigned short& x)
{
    x = (x>>8) | 
    (x<<8);
}
                
void swap_endian(unsigned int& x)
{
    x = (x>>24) | 
    ((x<<8) & 0x00FF0000) |
    ((x>>8) & 0x0000FF00) |
    (x<<24);
}
                              
void swap_endian(unsigned long& x)
{
    x = (x>>56) | 
    ((x<<40) & 0x00FF000000000000) |
    ((x<<24) & 0x0000FF0000000000) |
    ((x<<8)  & 0x000000FF00000000) |
    ((x>>8)  & 0x00000000FF000000) |
    ((x>>24) & 0x0000000000FF0000) |
    ((x>>40) & 0x000000000000FF00) |
    (x<<56);
}

Using the native function with Functor operator in InfoSphere Streams using SPL

Once the native function is developed, the next step is to use the function in a SPL application to perform the data format conversion on data arriving from various sources in real time.

Below is the sample SPL code of application that invokes the native function within the Functor operator (Listing 3). The application is reading the input from a TCP port and performing the transliteration on the input before propagating it further.

InfoSphere Streams provides various source and sink adapters for reading and writing data. Any number of sources and sinks can be used in a real-time SPL application.

Listing 3. Sample SPL application using native function with Functor operator to perform data format conversion
composite Main {
    graph
                
    stream<int32 inp> Src =  TCPSource()                                         
    {                                                                        
        param                                                                  
        role                : client;                                        
        address             : "inputdomain.com";                             
        port                : 23145u;                                         
    }                                    
    stream<list<float64> result> Funct = Functor(Src)
    {
        output
            Funct:result=swap_endian(Src.inp);
    }
                
    () as TCPWrite = TCPSink(Funct)
    {
        param
        address:"143.121.112.76";
        role : server;
        port : 21344u;
    } 
}

You are all set to launch your real-time data format conversion job.Build and run the job.


Exceptional conditions

  • InfoSphere Streams provides a debugger that supports debugging a real-time application. The Streams Debugger provides various commands and options, which can be easily used to trace and validate the output. More information on the Streams Debugger can be found at Debugging with the Streams Debugger.

Conclusion

We have addressed how to perform data format conversion using InfoSphere streams native function. We have explained the various configuration settings to perform and how this can be achieved in InfoSphere Streams. Data format conversion will serve as a key component to solving inconsistent and incorrect results due to variation in architectures in a distributed environment.

Resources

Learn

Get products and technologies

  • Build your next development project with IBM trial software, available for download directly from developerWorks.
  • Now you can use DB2 for free. Download DB2 Express-C, a no-charge version of DB2 Express Edition for the community that offers the same core data features as DB2 Express Edition and provides a solid base to build and deploy applications.

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Big data and analytics on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Big data and analytics, Information Management
ArticleID=835288
ArticleTitle=Real-time data representation conversion in a distributed deployment using InfoSphere Streams native functions
publish-date=09202012