Feed InfoSphere Streams applications with live data from databases
Use InfoSphere Data Replication CDC as a source for real-time analytical processing
Business intelligence and analytics have been slowly transitioning from predominantly batch-oriented to more real-time systems. It might not be feasible for some data to be put at rest before using it for analytics, such as when a company wants to monitor for certain events. In an interconnected and instrumented world, some of the data used for analytics may come from devices such as cameras, microphones, sensors, or other sources. However, many companies generate a good amount of their data using online transactional processing (OLTP) applications and store it in transactional source databases. In many cases, you may want to apply analytical processing on this transactional data and possibly trigger events or alerts or send the data to a dashboard.
Data replication is the most automated and efficient way to deliver near real-time information from transactional databases to analytical processes. Data replication is designed to move only the data that is changed in the source system, rather than moving the entire source data set. InfoSphere Data Replication's CDC detects changes from the source database logs as they are written and committed by the transactional applications. When the capture engine finds newly changed data on the source, it pushes the changes to the apply engine. To handle the changes, you have a variety of choices, such as writing to another database table or flat file, sending the change as an XML message on a JMS queue, or calling a user exit program for customized processing. The changed data is captured from the source database log, transported over the network, and handled by the target engine without passing through any intermediate disk storage mechanism.
In this tutorial, learn how to connect CDC as a near real-time data source for InfoSphere Streams, explore several integration options, and download sources for the user exit and customize them to fit your needs.
The image below shows a high-level overview of how data flows from the database log to its consumers. Table changes made by the applications are written in the database log and picked up by the CDC source engine. After a transaction is committed, the CDC source engine sends it to a target engine that then takes care of writing changes to a database table, a flat file, or a message queue. Alternatively, you can choose to invoke a user exit for custom processing.
Figure 1. Flow of data from database to consumers
Sample use case: Monitoring spending on mobile phone calls
A communications service provider (CSP) wants to provide a new service whereby mobile phone subscribers can specify a personal threshold of costs per month. If the certain use of a mobile device causes the current balance to exceed this maximum, the customer receives a notification by email and SMS. The new feature serves two purposes:
- Customers can better control device usage.
- The CSP anticipates fewer situations where bills cannot be paid.
Overall, the new service should result in a better customer experience.
To implement the new functions, the CSP initially looked at changing the billing application. It was concluded, however, that it would take months before they could roll this out in production. For a faster time to market, they considered an approach in which billing information would be retrieved directly from the database tables. Billing information is typically kept as "rated" call data records (CDRs). Unfortunately, using standard query techniques and the associated impact would be inhibitive.
Because the billing application runs against a standard RDBMS, they had the idea to use the database log as the source for detecting events. CDC can pick up the change of the tables as they occur and trigger an application that handles the event. For flexibility and scalability, the customer chose to develop an InfoSphere Streams application fed with the changes by CDC.
This use case would be straightforward to implement, although it does not really leverage the strong analytical capabilities of InfoSphere Streams. A more sophisticated example would involve monitoring of the CDRs per subscriber and notifying the customer service center if there are anomalies in the calling behaviour. For example, a notification might involve a subscriber suddenly making international calls to countries she never did before, or if the current plan is no longer optimal for the subscriber.
Table and tuple definitions
To detect a balance reaching its threshold, there are two main database tables in scope: one for the rated CDRs (RATED_CDR), as shown in Table 1, and a reference table (CUST_THRESHOLD), which keeps the personal thresholds for every subscriber, as shown in Table 2. We purposely kept the technical representation of the database tables simple for this example.
Table 1. RATED_CDR
Table 2. CUST_THRESHOLD
For the RATED_CDR example table, the InfoSphere Streams tuple to be received by the source operator would look like Listing 1. The tuples have been generated with the example tuple generation program included in the Downloadable resources section.
Listing 1. rated_cdrT
TelcoRated_cdrT=tuple<rstring aud_commit_timestamp, rstring aud_transaction_id, rstring aud_entry_type,rstring aud_user, rstring b_msisdna, rstring b_msisdnb, rstring b_start_time, rstring b_end_time, float64 b_duration_sec, rstring b_cost, rstring msisdna, rstring msisdnb, rstring start_time, rstring end_time, float64 duration_sec, rstring cost >;
For the CUST_THRESHOLD table, the tuple definition is shown in Listing 2.
Listing 2. cust_thresholdT
TelcoCust_thresholdT=tuple <rstring aud_commit_timestamp, rstring aud_transaction_id, rstring aud_entry_type, rstring aud_user, rstring b_msisdn, rstring b_name, float64 b_max_monthly_charge, float64 b_threshold_percentage, rstring msisdn, rstring name, float64 max_monthly_charge, float64 threshold_percentage>;
You can use these tuple definitions when processing CDC flat files and when the entries are sent to the InfoSphere Streams application via the user exit.
To follow along with the examples, you need the following components:
- InfoSphere Data Replication 10.2 CDC Management Console
- InfoSphere Data Replication 10.2 CDC Access Server
- One of the InfoSphere Data Replication CDC Source Engines
- InfoSphere Streams
Depending on your choice to replicate via flat files or a user exit, you will need one of the following InfoSphere Data Replication CDC target engines:
- Flat files —InfoSphere Data Replication 10.2 CDC for InfoSphere DataStage
- User exit —InfoSphere
Data Replication 10.2 CDC Event Server
CDC Event Server was designed to target Java™ Message Service (JMS) queues, but you can also use it as a general purpose target and invoke user exits based on the replication events. After you've installed CDC Event Server, create an instance that will eventually serve as the target engine. During the instance creation, you must specify a JMS provider library. The only mandatory library is jms.jar, which is freely downloadable from IBM MessageSight JMS Client Pack.
Or, if your source engine is one of the Java engines (Oracle, DB2® for Linux®, UNIX®, and Windows®, SQL Server, Informix®, etc.), you can use that engine as a target and configure a loop-back replication.
Depending on your environment and latency requirements, you have a couple of choices for integrating InfoSphere CDC with your InfoSphere Streams application:
- Use the CDC for DataStage engine to stage transactions into flat files, after which the InfoSphere Streams application can process them in batches (Scenario 1).
- Deploy a CDC target engine user exit to directly feed the InfoSphere Streams application via a TCP/IP socket or a named pipe (Scenario 2).
This article discusses the pros and cons of both of these integration methods and provides application examples for illustration.
Independent of which choice you make, a CDC source engine captures the changes from the database log and when a transaction is committed, its operations are sent to the CDC target engine to be applied.
Feeding an InfoSphere Streams application with data coming from a transactional system is a natural fit for event processing (as discussed in Sample use case). Processing data from such a system, however, should not be confused with transactional processing by the InfoSphere Streams application. A transaction, also called a unit of work, is persisted in the database when a commit is performed. If the transactions were replicated to a CDC database engine and applied to a target database, CDC would retain transaction consistency and perform a database-commit operation on the target end. If the target side is an InfoSphere Streams application, CDC cannot perform this two-way commit. As soon as operations are dispatched to the InfoSphere Streams application via the user exit, CDC is no longer aware whether or not they have been fully processed.
You should avoid using InfoSphere Streams when you cannot afford or handle losing or duplicating a record change or even an entire transaction. For such applications, Enterprise Service Buses (ESBs) and Extract Transform and Load (ETL) solutions are better suited. InfoSphere Streams applications excel, when fast analysis and event handling on large volumes of moving data is required.
Scenario 1: Staging into flat files
InfoSphere Data Replication supports replication to comma-separated value (CSV) flat files, ready for immediate use, through its CDC for DataStage target engine, as shown in Figure 2. Per replicated table, you can designate the directory in which the flat files are created.
Figure 2. Staging into flat files
Flat files will follow a naming convention of
<table_name>x<date>T<time>, where x can be
D for a hardened flat file and @ for the flat file that is
currently being written to. In the CDC subscription's DataStage
properties, there are two parameters that control how often files are
hardened: number of rows and number of seconds, as in Figure 3. When
either one of these thresholds has been reached at a transaction boundary
for any of the tables replicated by the subscription, all currently open
files (with an @ symbol in their name) are closed and renamed to
<table>D<date>T<time>. This is to ensure that the changes
from all replicated tables up to a certain point can be processed.
Figure 3. InfoSphere DataStage properties
The InfoSphere Streams application should use the
to continuously scan the output directory assigned in the CDC table
mapping for new files. When a new file is hardened by CDC for DataStage,
the entries in the flat file can be processed using a
FileSource operator. The example CDCFlatFile.spl InfoSphere Streams
application, which you can download, demonstrates
how to scan a directory for hardened flat files and process the contents
of the files.
Because your InfoSphere Streams application can only depend on the changes in the hardened files, you would not achieve very low latencies between the transaction being generated at the source and the time it has been fully processed by the InfoSphere Streams application — typically in the range of minutes rather than seconds. One method to reduce the latency is to change the batch size thresholds to a lower value, but take into consideration that this may cause CDC to generate a huge number of flat files depending on the transaction volume.
The main advantage for using flat files is that CDC supports it as is, with no modifications, so implementation is straightforward.
Your InfoSphere Streams application should read the flat files as CSV
format and indicate that each of the columns is surrounded by
a quotation mark (
"). The first four fields of each line contain
- Timestamp in ISO format
- Transaction ID
- Operation type (I for insert, U for update, D for delete)
- User who performed the operation at the source
Following the four fixed fields, you'll find the before image of the updated or deleted row followed by the after image of the inserted or updated row. In the case of an insert operation, the before image will be empty, but the placeholders for the fields are there. In the case of a delete operation, the empty after image placeholders will be written, too.
It is recommended that you isolate the CDC for DataStage engine from the InfoSphere Streams cluster to avoid memory and CPU contention. Alternatively, consider designing your application such that only the InfoSphere Streams processing element (PE) processing the flat files runs on a cluster node that also holds InfoSphere CDC.
If the data replicated from the source system could contain quotation
marks, you must customize the CDC for DataStage flat file output using a
custom formatter, which allows using a different column separator, such as
|. An example custom formatter user exit that already uses the piping
character as the column separator is provided in Downloadable resources.
Scenario 2: Feeding InfoSphere Streams directly
When the time between the transaction generated at the source and the InfoSphere Streams application processing has to be short (less than a few seconds, though sub-second is often achievable in well-tuned environments), staging the data into flat files is not feasible. By using a user exit that hooks into the CDC subscription target side, record changes received from the source are passed on to the InfoSphere Streams application immediately.
The user exit is called for every insert, update, and delete operation passed together with the change data from the source side and subsequently it formats an output record it transports to the InfoSphere Streams application.
Two methods for feeding InfoSphere Streams directly are provided as part of this tutorial:
- Using a TCP/IP socket where the user exit is the client and the InfoSphere Streams application is the server
- Using Linux/UNIX-named pipes to pass data via inter-process communications
By using a TCP/IP socket or inter-process communications mechanism, low latencies can be achieved between the transaction being generated at the source and the time it has been fully processed by the InfoSphere Streams application. It is also scalable, as there is no intermediate staging to disk for the changed records.
In this section, we will cover both methods in a bit more detail and discuss the sample user exit. You can download the source, as well as the compilation and deployment instructions, for the sample user exit.
Feeding a TCP/IP socket
InfoSphere Streams applications are capable of receiving data directly from TCP/IP sockets. The CDC user exit can connect to a TCP/IP listener running on the InfoSphere Streams cluster and pass along data to the application via this channel, as outlined in Figure 4.
Figure 4. Feeding a TCP/IP socket
The InfoSphere Streams application then picks up the data from the socket using the
TCPSource operator, which is set up to act as a server. As
long as the InfoSphere Streams application is active, CDC will be able to write to
the socket. You can download the CDCTCPSocket.spl
example InfoSphere Streams application, which demonstrates how to read from a TCP/IP
socket and to process the contents.
In this implementation, the CDC target engine can (and should) be installed outside of the InfoSphere Streams cluster. It only connects to the InfoSphere Streams application via the network and passes along all the change data via the network.
Feeding a named pipe
A named pipe is a Linux/UNIX vehicle for establishing inter-process
communications, as in Figure 5. On the OS, it is represented
by a special type of object, called a fifo file, created through the
mkfifo command. A source process can write data to the pipe
as long as there is a process on the other end of the pipe that will read
Figure 5. Feeding a named pipe
If the user exit has been configured to write to a named pipe, the
InfoSphere Streams application picks up the data from the named pipe using the
FileSource operator. As long as the InfoSphere Streams application is
active, CDC will be able to write to the named pipe.
The CDCNamedPipe.spl example InfoSphere Streams application, which is included in Downloadable resources, demonstrates how to read from a named pipe socket and process the contents.
Most network file systems do not support named pipes, so you must install the CDC target engine on one of the InfoSphere Streams cluster nodes. You can mitigate impact and avoid contention by dedicating one of the nodes to the InfoSphere Streams PE that reads from the named pipe and the CDC target engine.
Configuring the user exit
The user exit facilitates replicating changes captured using InfoSphere CDC to an InfoSphere Streams application. Typically, the user exit is configured for a subscription going from a database to CDC Event Server so it does not require a target database. However, it also works for database targets with a CDC Java engine and could even be implemented as a loopback replication.
You can choose to pass the replicated events to either a TCP/IP socket or a named pipe (fifo file). The format of the information passed, called tuple, is character-separated values where the separator can be defined in the properties file. There are several fixed fields in the record passed, followed by information identifying the change. The tuple has the following format:
- Table schema and Table name are consolidated in one field and included in every replicated tuple. Because all replicated rows write to the same physical target (TCP/IP socket, named pipe), you need to be able to identify the table in which the row was changed.
- An ISO timestamp (yyyy-mm-dd hh24:mi:ss:ffffff) that indicates when the transaction was committed at the source. It resembles the &TIMSTAMP journal control field.
- The identification of the unit of work. All operations in a database transaction have the same transaction ID.
- The column that holds the type of operation (I=Insert, U=Update, D=Delete) performed at the source.
- The source database user who performed the database transaction.
- One field per source database column selected for replication. These fields hold the before image of the updated or deleted row. In the case of an insert, these fields are empty but will be present in the output format.
- One field per source database column that is selected for replication. These fields hold the after image of the updated or inserted row. In the case of a delete, these fields are empty but will be present in the output format.
All fields are separated by the specified column separator as configured in
the properties file; the default separator is a pipe (
|). All tuples end
with the Linux/UNIX newline character, which ensures that InfoSphere
Streams can correctly identify the end of the tuple.
The only difference between the data received from the CDC flat file and data received from the user exit is that the user exit prepends the record with a fully qualified table name. This is required because changes from all tables are directed to the same listener or named pipe. It is expected from the InfoSphere Streams application that it first retrieves the table name from the received change record and sends it to the appropriate output port for further processing.
Deploying the user exit
To implement the user exit you must use the following:
- Compile the Java sources according to the specifications in the compile_readme.txt file
- Copy all .class files to the <cdc_home>/lib directory of the target engine
- Copy the CDCStreams.properties to the <cdc_home> directory of the target engine
Configuring the user exit properties
Before you configure the replication, review the CDCStreams.properties file
and specify the method you want to use to transport the changes to the
InfoSphere Streams application. You can do this through TCP/IP
tcp) or a named pipe
namedpipe). For replication to a
TCP/IP listener, you must specify the host name or IP address and the port
to which the sockets have to be sent (
Alternatively, the user exit can target a named pipe, in which case the
namedPipe attribute must be set to a fifo file local
to the target engine.
If the TCP/IP listener cannot be reached by CDC, the subscription will fail after the specified TCP connection timeout (the default is 2 minutes). If the named pipe does not exist when the subscription is started, it will fail with an error immediately.
There are other properties, such as the field separator (separator attribute), that can be adjusted so the behaviour of the user exit matches the requirements of your InfoSphere Streams application.
Setting up the replication
The mapping of tables when the target is InfoSphere Streams is not much different than setting up the replication between source and target database tables. In this case, however, you will be mapping the source table to a dummy CDC Event Server table. The purpose of the table is to define the structure of the records to be written — not to hold any data. The user exit takes care of intercepting any insert, update, and delete operations, feeding InfoSphere Streams and avoiding the execution of the operation on the dummy target table.
First, the subscription is created from the CDC Management Console. Our source tables are in a DB2 LUW database and, as we only intend to target a InfoSphere Streams application, the target CDC datastore is CDC Event Server, as in Figure 6.
Figure 6. New subscription
After the subscription has been created, continue mapping the tables to the subscription. Rather than specifying a message destination, we will use Standard replication, as in Figure 7, which lets us choose or create a target table. Then you can select the source table to be replicated.
Figure 7. Map tables — Select mapping type
Select the source table to be replicated, which for the example is TELCO.CUST_THRESHOLD DB2 table.
Figure 8. Map tables — Select source table
There is no target table, so we will create a dummy target table by selecting the STAGE schema and Create Table.... In CDC Event Server, STAGE is the only schema into which the dummy target table can be created.
Figure 9. Map tables — Select target table
For convenience, we create a target table with the same name as the source table. This is the table name that will be passed to the user exit.
Figure 10. Create target table
As shown below, specify the columns that comprise the target table. By default, all columns present in the source table are used.
Figure 11. Define columns
A few intermediate windows will be shown, and finally the CREATE TABLE SQL statement is displayed.
Figure 12. Create table SQL statement
Now that the dummy table has been created, you can select it as the target table.
Figure 13. Select target table
You can specify the keys to use to update or delete records. They are not important for the example because we will not perform any operations on the target table; we will just use the record images passed by the CDC source engine.
Figure 14. Specify key
The replication method of the mapped table should be set to Mirror (Change Data Capture) because we want to send the changes as they occur.
Figure 15. Set replication method
Now that the table mapping is finished, complete the configuration of the user exits, both at the subscription level and at the table level, to ensure that the InfoSphere Streams application is targeted. Open the table mapping from the CDC Management Console to continue.
Figure 16. Mapped table
The subscription-level user exit must be defined because it takes care of the initialization and reading the configuration from the properties file. In this example, we set the user exit to Java class CDCStreams(.class), which is available in Downloadable resources. The subscription- and row-level user exits have been consolidated into a single class. If you fail to set the subscription-level user exit, the replication stops with an error when the first operation is replicated.
Figure 17. Set subscription-level user exit
Figure 18. Set class name
You also have to set the same user exit at the row level for all mapped tables for at least one of the operations (before-insert, before-update, or before-delete). As long as the user exit is configured for one of the operations, it will automatically activate itself for the other before operations. To convey the fact that the user exit is invoked prior to all insert, update, and delete operations, we recommend that you check all three boxes.
Figure 19. Set row-level user exit
Save the table mappings and continue to map the remainder of the tables in scope, keeping in mind that for every table the row-level user exits must be configured.
Testing the replication and the user exit
If this is the first time you are replicating changes from CDC to your InfoSphere Streams application, consider validating the output of the user exit. This section shows a method for setting up a target for your subscription to validate the tuples that will be sent to the InfoSphere Streams application.
Testing the TCP/IP target
On the server that runs the TCPSource operator of your InfoSphere Streams application,
nc –l <port> command from the Linux command
Listing 3. Start a listener
[streamsadmin@streams-server ~]# nc -l 12345
This command starts a listener (server) at port 12345 and will output all information received. When you start the subscription, now you will find some events in the target subscription event log.
Figure 20. Subscription target event log for TCP/IP connection
Insert some records into the source table, and the listener command lists the tuples that have been received.
Listing 4. List received tuples
[streamsadmin@streams-server] # nc -l 12345 TELCO.CUST_THRESHOLD|I|2014-05-23 16:57:07.000000000000|||||31653111222|Isaac Stroming|200.00|70 TELCO.CUST_THRESHOLD|I|2014-05-23 16:59:26.000000000000|||||31651444333|Chris D. Cosi|350.00|45 TELCO.CUST_THRESHOLD|I|2014-05-23 17:00:28.000000000000|||||33612939415|Iris Bonne Maman|80.00|90
Testing the named pipe target
Create a named pipe as specified in the CDCStreams.properties file using
mkfifo <named pipe filename> command. After the
named pipe has been created, start the replication. You can view the
tuples by doing a
<named pipe filename> on the
Linux command line.
<named pipe filename>
[streamsadmin@streams-server] cat CDCNamedPipe.csv TELCO.CUST_THRESHOLD|2014-06-25 12:35:59.000000000000|149363|U|DB2INST1|33612939415|Iris Bonne Maman|80.00|85|33612939415|Iris Bonne Maman|80.00|86 TELCO.CUST_THRESHOLD|2014-06-25 12:36:44.000000000000|149364|U|DB2INST1|44722333444|River Flenix|56.00|100|44722333444|River Flenix|65.00|100
InfoSphere Streams application examples
In the example InfoSphere Streams applications, we use Export and Import operators so we can reuse parts of the application and provide flexibility on how the tuples are fed to it.
Figure 21. Sample InfoSphere Streams application flow
We can distinguish two parts in the sample application. The first part receives the changes and formats them so they can be processed in the second part of the sample application.
Ingesting flat file data
In the CDCReceiveFlatFile.spl composite, the /tmp/telco directory is scanned for files starting with CUST_THRESHOLD.D or RATED_CDR.D. Files with this pattern are the hardened ones, and the InfoSphere Streams application can safely read them since they will not change anymore. Because the files that are written into the scanned directory already hold the name of the table, the mapping of the file records to the respective tuple types is straightforward and can be done without any further parsing. After the input records are converted into tuples, they are exported for further processing by the CDCTupleProcess.spl composite.
Ingesting data coming from the user exit
The CDCReceiveTCP.spl and CDCReceiveNamedPipe.spl composites read change records, respectively, from a TCP/IP socket and from a named pipe. Changes are arriving from the CDC in near real time.
Because only one port or one named pipe is configurable in the user exit,
changes from all tables mapped in the subscription will be read by the
FileSource operator. Each of the
change records is prefixed with the fully qualified source table name so
you can distinguish the tuple type. As both source types for input deliver
the same type of information, we have separated the parsing of the record
from the ingestion; all change records are immediately exported to be
processed by the CDCTupleSplit.spl composite.
During the first part of the CDCTupleSplit.spl composite, the raw record is split to extract the table name and the actual data that must be parsed into a tuple. Because the InfoSphere Streams Parse operator expects a blob data type, the data is converted to this type and the newline character that was suppressed by the source operator is also re-added. In the subsequent Split operator, the records are directed to the output port that leads to the Parse operator for the table in question.
After the input records are converted into tuples, they are exported for further processing by the CDCTupleProcess.spl composite.
Processing the tuples
The example CDCProcess.spl composite imports tuples coming from the CDCReceiveFlatFile.spl and CDCTupleSplit.spl composites and writes them to output files in the /tmp/telco/output directory.
Running the InfoSphere Streams application and receiving tuples
Finally, we run the sample InfoSphere Streams application and make some changes in the source tables, which are then reflected in the output files.
Listing 6. Change source tables
[streamsadmin@streams-server] cat /tmp/telco/output/custthreshold_output.txt "2014-06-25 18:19:24.000000000000","152151","U","DB2INST1","33612939415","Iris Bonne Maman","80.00","86","33612939415","Iris Bonne Maman","80.00","85" "2014-06-25 18:19:25.000000000000","152211","U","DB2INST1","44722333444","River Flenix","65.00","100","44722333444","River Flenix","66.00","100"
In this article, you learned how to use InfoSphere Data Replication CDC to capture changes made on source transactional databases and to replicate them to an InfoSphere Streams application for real-time analytical processing. You can use the files in Downloadable resources to start integrating the two products. Related topics provides links to learn more about CDC and InfoSphere Streams.
- Learn more about InfoSphere Data Replication CDC from the IBM Redbooks® titled "Smarter Business: Dynamic Information with IBM InfoSphere Data Replication CDC." Specifically, see the chapter that discusses "Customization and Automation" for more information about developing user exits.
- Visit the developerWorks InfoSphere Streams Playbook to find more resources for InfoSphere Streams developers.