Integrating InfoSphere Streams 3.0 applications with InfoSphere Information Server 9.1 DataStage jobs, Part 1: Connecting the Streams job and DataStage job

This article describes the integration architecture between InfoSphere® Streams and InfoSphere DataStage, and it provides step-by-step instructions for connecting InfoSphere Streams 3.0 applications with InfoSphere DataStage 9.1 jobs using the InfoSphere Streams DataStage Integration toolkit for Streams and the InfoSphere DataStage Streams connector.

Stevan Antic (santic@us.ibm.com), Advisory Software Engineer, IBM

Author photo of Stevan AnticStevan Antic is an advisory software engineer in the Information Integration Solutions, Information Management Group at the IBM Boca Raton south Florida site. He has more than 14 years of experience of software development in Mercator software, Ascential Software, and IBM SWG.



Mike Koranda (koranda@us.ibm.com), Senior Technical Staff Member, IBM

Mike KorandaMike Koranda is a senior technical staff member in IBM's Software Group and has been working at IBM for more than 30 years. He has been working in the development of the InfoSphere Streams product for the past six years.



Paul Stanley (stanleyp@us.ibm.com), Senior Architect, IBM

Paul Stanley photoPaul Stanley is a senior software architect in Information Integration Solutions, Information Management Group. He has been architecting and managing the development of connectivity components for InfoSphere Information Server and WebSphere Transformation Extender for over 15 years.



09 May 2013

Also available in Chinese

Introduction

As part of the InfoSphere Streams 3.0 and InfoSphere Information Server 9.1 releases, both products have been enhanced to enable easy integration between Streams applications and DataStage jobs.

This article provides technical details explaining how to create end-to-end integration scenarios between Streams applications and DataStage jobs. After reading this article you will become familiar with the following concepts:

  • Sending data from a Streams application to a DataStage job, including how to:
    • Create or modify an existing Streams SPL application to send a data stream to a DataStage job.
    • Export the modified Streams application metadata and import in into the Information Server environment.
    • Create or modify an existing DataStage job that will consume data from the Streams application.
  • Sending data from a DataStage job to a Streams application, including how to:
    • Create or modify an existing DataStage job to send a data stream to a Streams application.
    • Export the DataStage job metadata, and use the exported metadata to generate Streams application SPL stub code.
    • Create or modify an existing Streams Application using the generated SPL stub code.

This article is the first in a series of articles that will help you integrate two products that are the core of the IBM Big Data initiative. Part 1 will focus on a simple scenario that will familiarize you with the basic technical details of integrating these products. Part 2 in this series will provide a deep dive into metadata interfaces that can be used to connect the Streams near real-time analytics capabilities to the DataStage ETL engine and its rich connectivity features.


Integration architecture

Integration between InfoSphere Streams and InfoSphere DataStage is accomplished via a number of design time and runtime components that are provided by Information Server 9.1 and the InfoSphere DataStage Integration Toolkit for Streams.

The IBM InfoSphere DataStage Integration Toolkit provides Streams operators and commands that facilitate integration between IBM InfoSphere Streams and IBM InfoSphere DataStage. Integration of InfoSphere DataStage and InfoSphere Streams applications involves flowing data streams between the applications and configuring them to use the data. The integration occurs through an InfoSphere Streams connector on the DataStage side and a DSSource operator or a DSSink operator on the Streams side.

By sending data to InfoSphere Streams from IBM InfoSphere DataStage, InfoSphere Streams can perform near real-time analytic processing (RTAP) at the same time as the data is loaded into a warehouse by IBM InfoSphere DataStage. Alternatively, when sending data from InfoSphere Streams to IBM InfoSphere DataStage, the InfoSphere Streams job performs RTAP processing, and then forwards the data to IBM InfoSphere DataStage to enrich, transform, and store the details for archival and lineage purposes.

The diagram shown in Figure 1 depicts an example of this connectivity.

Figure 1. Runtime data flow between the Streams and DataStage jobs
Runtime integration architecture that shows data flow between the Streams and DataStage jobs using the Streams connector and DSSource/DSSink operators.

Design time integration between Streams jobs and DataStage jobs is supported in the following two directions.

  • From Streams to DataStage. The process involves generating endpoint metadata files on the Streams side, importing them into the Information Server repository, and then using this metadata in the configuration of the Streams Connector in the DataStage job.
  • From DataStage to Streams. DataStage job metadata is imported by the DataStage Integration Toolkit on the Streams side which creates SPL code that can be used in larger SPL applications.

The steps required to import metadata from Streams to DataStage are shown in Figure 2.

Figure 2. Design time integration steps from Streams to DataStage job
Design time integration steps from Streams to DataStage job, using: generate-ds-endpoint-defs Streams DataStage Integration toolkit command, and IMAM Streams Metadata Bridge to transfer metadata into metadata server.
  1. Run the DataStage Integration Toolkit's generate-ds-endpoint-defs command to gather endpoint details from one or more Streams application description language (ADL) files to create an application endpoint description file.
  2. Copy the endpoint description file from the Streams machine to a client machine where InfoSphere Metadata Asset Manager (IMAM) will be run.
  3. Run IMAM and select the InfoServer Streams connector. Select the endpoint description file and import the endpoint metadata into the Information Server repository.
  4. Design the DataStage job in DataStage Designer. After adding a Streams stage to the job, configure the stage and select the endpoint metadata. The metadata is used to populate the columns on the link and set the connection properties.

The steps required to import metadata from DataStage to Streams are shown in Figure 3.

Figure 3. Design time integration steps from DataStage to the Streams job
Design time integration steps from the Streams to DataStage job using generate-ds-spl-code Streams DataStage Integration toolkit command to generate SPL code.

The Streams side tasks can also be accomplished in Streams Studio.

For more information on the DataStage Streams connector and DataStage Integration Toolkit, refer to the Resources section.


Ping-Pong sample application

This section describes a sample application that demonstrates the technical details of the integration scenario. The components and metadata are defined, which describes the data to be exchanged between the Streams application and the DataStage job.

Ping: Sending data from Streams application to DataStage job

This section describes the following steps:

  1. Create a sample SPL application called Ping.spl. The Ping.spl job uses the beacon operator to generate data and sends the data to the DataStage Pong job using the DSSink operator.
  2. Compile Ping.spl and export its metadata using the DataStage Integration Toolkit.
  3. Import the exported metadata into Information Server metadata server using the IMAM Streams importer.
  4. Create the DataStage Pong job that receives data from the Streams Ping application. The Pong job uses the Streams connector stage to receive data sent from the Streams application and writes the received data to the Peek stage.
  5. Run the Streams Ping application, and run the DataStage Pong job.
  6. Review the Pong job data logged by the Peek stage for accuracy.

Create a sample SPL application Ping.spl

Ping.spl is a simple Streams application that sends the message "Hello DataStage!" with the message creation time stamp to a DataStage job using the DSSink operator, as shown in Listing 1.

Listing 1. Ping.spl Streams application that sends data to DataStage
1   use com.ibm.streams.etl.datastage.adapters::*;
2
3   composite Ping{
4
5   type messages = tuple< rstring MsgBody, timestamp CreatedDate > message;
6
7   graph
8      stream<messages> PingGenerator = Beacon() {
9        param iterations 	:	1u;
10       initDelay		:	1.0;
11       output PingGenerator  : 	message =
12       {
13           MsgBody         = "Hello DataStage!",
14           CreatedDate  = getTimestamp()
15        };
17
18      () as PingSender = DSSink(PingGenerator)
19      {
20         param  connectionName  : "To_DataStage_Ping";
21      }
22  }

Line 1 of the Ping.spl code imports definitions of DSSource and DSSink operators from the DataStage Integration toolkit. Line 5 declares the metadata of the message that is sent to DataStage. Lines 8 to 16 declare a Beacon operator that generates a single message to the stream. Lines 18 to 21 declare the DSSink operator. Line 20 declares the name of a TCP/IP connection that is used to send data from the Streams application to a DataStage job. The TCP/IP connection is registered with the Streams name server when the Streams job is started. This TCP/IP connection definition is known as an endpoint from the perspective of DataStage.

Compile Ping.spl and export its metadata using the DataStage Integration Toolkit

To compile Ping.spl, specify the following command, replacing the path for the -t option with the appropriate path to the Streams DataStage Integration toolkit installation directory:

$sc -M PingDS -t /opt/ibm/InfoSphereStreams/toolkits/com.ibm.streams.etl
--output-directory=PingDS

The Streams DataStage Integration toolkit command that is used to generate the endpoint description file PingDS.endpoints.xml from the compiled adl file is as follows:

$ /opt/ibm/InfoSphereStreams/toolkits/com.ibm.streams.etl/bin/generate-ds-endpoint-defs 
PingDS/PingDS.adl

This command will generate the endpoint description file named PingDS.endpoints.xml, as shown in Listing 2.

Listing 2. Generated endpoint description XML file
1 <?xml version="1.0" encoding="UTF-8"?><endpointSet …>
2 <endpoint applicationName="PingDS" applicationScope="Default" confirmWireFormat="true"
3 flow="in" name="To_DataStage_Ping" protocol="tcp" role="server" tags="DataStage">
4 <wf:wireformat format="streams_bin" includesPunctuations="true">
5    <wf:schema>
6       <tt:attr name="message">
7          <tt:tuple>
8             <tt:attr name="MsgBody" type="rstring"/>
9             <tt:attr name="CreatedDate" type="timestamp"/>
10         </tt:tuple>
11      </tt:attr>
12   </wf:schema>
13 </wf:wireformat>
14 </endpoint>
15 </endpointSet>

On line 3, the attribute flow="in" declares the direction of the data flow relative to the DataStage job, the value "in" meaning inbound to DataStage. Lines 5 to 12 define the message schema.

Import the exported metadata into Information Server metadata server

The generated endpoint description file is imported into Information Server metadata server using IBM InfoSphere Metadata Asset Manager (IMAM).

  1. Copy the endpoint description file PingDS.endpoints.xml to the computer where you run IMAM.
  2. Start IMAM and select the IBM InfoSphere Streams connector to import the endpoint metadata, as shown in Figure 4.
    Figure 4. Importing the endpoint description file using IMAM
    To import the Streams endpoint definition file using IMAM, configure the new IMAM import area by selecting the Streams connector.
  3. Click Next to display the Streams importer.
  4. Select the endpoint description file and complete the import process, as shown in Figure 5.
    Figure 5. Selecting the endpoint description file
    We configure the import area by selecting the endpoint description file from the local file system.
    The import process will create objects representing the end point in the repository which can be subsequently used in a DataStage job.

Creating a DataStage Pong job that receives data from the Streams Ping application

The next step is to create a DataStage parallel job named Pong that receives data from the Streams Ping application.

  1. Drag and drop the Streams connector stage and the Peek stage from the Palette to the parallel job canvas.
  2. Add an output link from the Streams connector to connect it to the Peek stage.
  3. Edit the Streams connector stage and click the Configure button to configure the Streams connector columns and properties.
  4. Select the endpoint Default/To_DataStage_ping, as shown in Figure 6, which was imported into the repository via IMAM in the previous step.
    Figure 6. Configuring the Streams connector link schema
    To configure the Streams connector column interface, open the stage GUI, click on configure button, and select the endpoint from the list.Default is the default name of the application scope, and To_DataStage_ping is the name of the endpoint as specified in the connectionName parameter in the Ping.spl Streams application.
  5. Click OK, then inspect the column definition for the link and see that it corresponds to the message defined by Ping.spl, as shown in Figure 7.
    Figure 7. The Streams connector link schema
    The columns tab of the stage GUI, displays the schema of the Streams connector output link.
  6. Set up the connector properties. The Name server host and port are set to the values returned by the Streams geturl command which is issued on the Streams server. Line 2 shows the results of issuing this command:
    1  bash-3.2$bash-3.2$ streamtool geturl -i santic
    2  https://9.22.116.149:8443/streams/console/login
  7. The Username and Password properties are set to the values used to login into the streams console using the URL returned by the geturl command, as shown in Figure 8.
    Figure 8. The Streams console login page
    The connection properties username and password should be the same as used to login into the Streams console.
  8. The Keystore file must be exported from the Streams server, and imported to the DataStage server. This operation only has to be performed one time. The Keystore file is exported from the Streams server where the streams instance is running using the keytool command, as shown in Listing 3.
    Listing 3. Keystore file exported
    1 $ /opt/ibm/java-x86_64-60/bin/keytool -keystore
    2 ~/.streams/instances/santic\@santic/sws/security/keystore/ibmjsse2.jts 
    -export -alias lwiks -file santic.keystore
    3 Enter keystore password:
    4 Certificate stored in file <santic.keystore>
  9. Import the Keystore certificate use the Keytool command that is provided under the ASBNode or ASBServer directories of the DataStage engine install:
    $/opt/InformationServer/ASBNode/apps/jre/bin/keytool -import -alias lwiks -file 
    santic.keystore -keystore santic.certificate.keystore
  10. Enter the Keystore file property into the Keystore file and enter the remaining required connection properties, as shown in Figure 9.
    Figure 9. The Streams connector properties page
    We setup the required connection properties to complete the Streams connector configuration

Run the Streams Ping application and the DataStage Pong job

The Streams Ping application is executed by using the Streams submitjob command, as shown in Listing 4.

Listing 4. Streams Ping application executed
$ streamtool submitjob -i santic PingDS/PingDS.adl
CDISC0079I The system is submitting 1 applications to the santic@santic instance.
CDISC0080I Job ID 37 was submitted for the application that is stored at the following 
path: PingDS/PingDS.adl.
CDISC0020I Submitted job IDs: 37

Now that the Streams job has been started, the DataStage Pong job can now be compiled and executed. The Streams connector will contact the Streams nameserver to look up the host and port for the To_DataStage_Ping connection. It then establishes a TCP/IP connection to that host and port. Once the connection has been established, the Streams DSSink operator will begin to send messages to the Streams connector. The received message can be observed in the Job log dialog, as shown in Figure 10.

Figure 10. DataStage Pong job, and the job log after successful run
After DataStage Pong job successful execution we observe in the designer client the job log status that displays the Hello DataStage message received from the Streams Ping job, the green status of the data link signals the job complition.

Pong: Returning data from DataStage job to the Streams application

In this section, you can do the following.

  • Modify the Pong job to add the Streams connector that will send Ping data back to the Streams Ping application.
  • Save and compile the Pong job. This will store the job metadata in the Information Server metadata repository.
  • Use the Streams DataStage Integration Toolkit to import the Pong job metadata from the Information Server metadata server. This will generate SPL stub code utilizing the DSSource operator.
  • Integrate the SPL stub code into Ping.spl code. This will enable the Streams Ping application to receive data from the DataStage Pong job.
  • Modify the Ping.spl job to add the file operator to store DSSource data into Pong.txt file.
  • Run the Streams Ping application and run the DataStage Pong job.
  • Review the Pong.txt data for accuracy.

Modify the DataStage Pong job to return data to Streams

  1. Add a Copy stage and another Streams connector to the Pong job and add the links, as shown in Figure 11.
    Figure 11. Modified DataStage Pong job with the Streams connector added as data target
    We replaced the target peek stage with the copy stage, the copy stage has two output links that connect to the Streams connector and to the peek stage.
  2. The columns for all links should be the same and can be set by using the Mapping tab of the Copy stage. Edit the target Streams connector and define its properties, as shown in Figure 12.
    Figure 12. The target Streams connector property page
    The target Streams connector property page is setup similarly to the source Streams stage, the 'connection name' property has to be unique because it defines the endpoint name of the Streams DSSource operator that receives the data.
  3. The connection properties can be copied from the source Streams connector. One way to accomplish that is to use the 'Save' function in the source connector, and the 'Load' function in the target connector. The connection name should be set to 'From_DataStage_Pong'. This will define the name of the endpoint that will be registered with the Streams name server. After setting up these properties, save and compile the job.

Import the DataStage job metadata to Streams

  1. On the Streams server use the generate-ds-spl-code tool from the DataStage Integration Toolkit to generate SPL code that corresponds to the Pong job.
  2. The tool requires selecting a project to import from, DataStage credentials, the name of the job to import, and the stage within the jobs to import from. Listing 5 shows an example of the interactive invocation of the tool.
    Listing 5. Interactive invocation example
    $ /opt/ibm/InfoSphereStreams/toolkits/com.ibm.streams.etl/bin/generate-ds-spl-02 
    code
    Enter the server to connect to the data stage repository.
    9.184.184.63:9080
    INFO - CDIST1265I - Using server: 9.184.184.63:9080
    Enter the user id to connect to the data stage repository.
    dsadm
    INFO - CDIST1266I - Using user: dsadm
    Enter the password to connect to the data stage repository.
    *****
    Enter the number of the project below or '0' to exit.
    1) IPSHYD53:dstage1
    2) IPSHYD53:dstage2
    3) IPSHYD53:Streams_dev
    3
    INFO - CDIST1267I - Using project: IPSHYD53:Streams_dev
    Enter the number of the job below or '-1' to see the project list or '0' 
    to exit.
    1) CopyOfSendRcvAllPrimitives
    2) Pong
    3) PongV1
    4) RcvMisc
    2
    INFO - CDIST1268I - Using job: Pong
    Enter the number of the connection below to add or or '-2' to generate or '-1' 
    to see the job list or '0' to exit.
    1) ToStreams target_stage
    2) FromStreams source_stage
    1
    Enter the number of the connection below to add or or '-2' to generate or '-1' 
    to see the job list or '0' to exit.
    1) ToStreams target_stage*
    2) FromStreams source_stage
    -2
    INFO - CDIST1260I - Creating connection: ToStreams
    INFO - CDIST1262I - Completed file generation to file: 
    DataStageIntegration.spl
  3. The tool generates a stub SPL file named DataStatgeIntegration.spl that can then be modified to add additional logic, or it can be used within larger applications. The generated file has the contents shown in Listing 6.
    Listing 6. Generated stub SPL file
    1  use com.ibm.streams.etl.datastage.adapters::*;
    2
    3    type
    4      ToStreams_Schema =
    5        timestamp message_CreatedDate,
    6        rstring message_MsgBody;
    7
    8  /* The following Main composite contains the DSSource and DSSink operators 
    used to connect to a datastage job. */
    9  /* For convenience a Beacon operator that generates sample data is provided 
    for each DSSink. */
    10 /* For convenience a Custom operator that prints out the tuple and punctuation is 
    provided for each DSSource. */
    11 composite Pong {
    12   graph
    13
    14 /* Start of Connection : ToStreams */
    15 /* The following DSSource operator is used to receive data from a datastage job.*/
    16     stream<ToStreams_Schema> ToStreams_source_stage_outputStream = DSSource() {
    17       param
    18         connectionName : "From_DataStage_Pong";
    19         outputType     : ToStreams_Schema;
    20   }
    21     /* The following Custom can be used to show the data sent to the DSSource */
    22     () as ToStreams_source_stage_outputStream_Custom = Pong_datastage_Custom(
    ToStreams_source_stage_outputStream) {}
    23 /* End of Connection : ToStreams */
    24     // Warning: applicationScope in DataStage was not set by connector name: 
    ToStreams
    25     // a default applicationScope was indicated by connector name: ToStreams 
    so the config applicationScope : <scope> clause will be left off
    26 }

    Lines three to six define the message structure that was defined by the input link to the ToStreams Streams connector. Line 18 shows the connectionName parameter that was obtained from the Connection name property defined in the stage.
  4. Before using this composite in a job, rename the file to Pong.spl and replace lines 21 to 25 with the code shown in Listing 7, which will write the data sent to Streams to a file named Pong.txt.
    Listing 7. Code to write data to Pong.txt
    21     () as Sink2 = FileSink(input0)
    22     {
    23        param
    24        file : "Pong.txt";
    25        format : txt;
    26        flush : 1u;
    27     }

Modify Ping.spl to incorporate the generated code

The next step is to modify Ping.spl to use this generated composite.

  1. Edit Ping.spl and add the following as the last line of the Ping graph definition:
     () as DataFromPong = Pong (){}
  2. Save the file and recompile the application using the commands shown previously.

Run the Streams Ping application and the DataStage Pong job

Before running the modified Ping application, it is necessary to stop the previous Ping job.

  1. Use the canceljob command of the streamtool to perform this operation, specifying the job number that was displayed to the console when the job was submitted:
    $ streamtool canceljob -i santic 37
    CDISC0021I Job ID 46 of the santic@santic instance was cancelled.
  2. Submit the Ping job as before. When the Ping job starts it will register both the To_DataStage_Ping and From_DataStage_Pong endpoints, and will then wait for the DataStage job to start and to connect to the endpoints.
  3. Once the Streams job has been started, the DataStage job is then started. It will receive a message from Streams, and then return the same message back to the Ping job, as shown in Figure 13.
    Figure 13. DataStage Pong job, and the job log after successful run
    After DataStage Pong job successful execution we observe in the designer client the job log status that displays the Hello DataStage message received from the Streams Ping job, the message is sent back to the Streams Ping application, the green status of the data link signals the successful job completion.
  4. To ensure that the data was correctly received on the Streams side, look at the Pong.txt file that will have been created in the data directory:
    $ more data/Pong.txt
    {message_CreatedDate=(1365105503,133589000,0),message_MsgBody="Hello DataStage!"}

Conclusion

In this article we demonstrated how to integrate a DataStage job, and a Streams job to exchange data in both directions. We created a simple real time ETL application using the new features released in Information Server 9.1 and InfoSphere Streams 3.0. This article provided know-how techniques, and code samples, that enable readers to build real-time ETL application, by using the new acquired knowledge.

This is the first part of the series on this topic, in the second part we will discuss complex data and metadata interfaces that can be used to integrate Streams applications with DataStage jobs.


Acknowledgements

We would like to thank Steven Friedland for his feedback and reviews of this article.


Download

DescriptionNameSize
Sample jobs for this articlesampleJobs.zip8KB

Resources

Learn

Get products and technologies

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Information management on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Information Management
ArticleID=928743
ArticleTitle=Integrating InfoSphere Streams 3.0 applications with InfoSphere Information Server 9.1 DataStage jobs, Part 1: Connecting the Streams job and DataStage job
publish-date=05092013