Real-time integration with InfoSphere Event Publisher
Part 1 discussed the technologies of the InfoSphere Replication Server and InfoSphere DataStage products and the different ways of integrating the two to feed the data warehouse. It also provided pros and cons for the various integration options. Review this article if you need to before you begin to set up and configure integration options.
Figure 1 illustrates a sample real-time integration setup. The Event Publisher configuration on which the source data resides is on a Windows system svl-akrishni2. The operational data store (ODS) on which InfoSphere DataStage runs is on an AIX system scoobydoo. The data warehouse location does not matter and can be configured on an entirely different system.
Figure 1. Event publication integration details
The Q Capture program captures the changes in the Product and Inventory source tables from the log and publishes them to a WebSphere MQ Queue. This queue is referred to as the send queue (SALES_DATA_TO_DATASTAGE) and is defined in a queue manager QMEP on the Windows system svl-akrishni2. This QMEP queue manager sends the messages across to a queue on the AIX system. This queue is referred to as the receive queue (SALES_DATA_TO_DATASTAGE, which is the same name as the send queue) and is defined in a queue manager QMDS on the AIX system.
The event publication is configured so that the message is in comma-separated value (CSV) format, and each message contains a single row operation, such as INSERT, UPDATE, or DELETE. Listing 1 shows a sample MQ message in the queue.
Listing 1. Sample MQ message in the queue
10,"IBM","2009355","021255984000","AKRISHNI","PRODUCT","REPL","0000:0000:0000:0000:5ae5", "0000:87a0:c204:0000:0000","2009-12-21-10.12.52",,0000,"100-201-01","Ice Scraper, Windshield 4 inch",3.99,,,,,"100-201-01","Ice Scraper1, Windshield 4inch",3.99,,,," <product pid=""100-201-01"" ><description><name>Ice Scraper,Windshield 4 inch</name><details> Basic Ice Scraper 4 inches wide, foam handle </details><price>3.99</price></description></product>"
As described in Part 1, the first 12 columns in the message are replication headers, and the remaining columns are the change data captured from the source table. The headers in columns 6 and 7 are important to your setup and configuration. The header in column 6 is the name of the source table, and the header in column 7 is a 4-character code for the SQL operation that causes the row change. The four-character code ISRT refers to insert. REPL refers to update. DLET refers to delete operations. For a detailed explanation of all the headers in the message, refer to the IBM InfoSphere Event Publisher in formation in the DB2 V9.7 Information Center (see Resources).
The DataStage job does the following:
- Reads the messages from the receive queue on the AIX system
- Parses the messages
- Transforms the data
- Writes to data sets, which are operating system files created by data set stage
The change data corresponding to each of the three types of row operations for a particular source table (INSERT, UPDATE, or DELETE) are written to different data sets. So for each source table, the DataStage job generates three data sets with the respective insert, update, or delete data.
Note: The example assumes there are no dependencies between the messages, such as referential constraints. The order of processing an INSERT message can be separate from the order of processing a DELETE message. If there are dependencies across the messages, all the messages need to be written to a single data set in order to preserve the ordering, and the DataStage job itself needs to ensure that messages are processed in the correct order.
The high-level steps for the Event Publisher configuration are as follows:
- Create the DB2 objects (the source tables)
Note: This first step is only needed for this example scenario; it is not a step in the configuration setup. Normally, the DB2 objects would already exist, and you would identify the source tables and determine the subset of data to be replicated.
- Create the Event Publisher setup by doing the following:
- Create the DataStage setup by doing the following:
- Test the real-time configuration setup by doing the following:
The following sections describe the example in detail. All the scripts needed to configure this sample are included with this tutorial in the Download section.
For the sample setup, you will use a provided database called SALES and import the PRODUCT and INVENTORY tables, which are ixf files provided with the download. These are the source tables for the scenario. Alternatively, you can either use the PRODUCT and INVENTORY tables that come with the DB2 V9.7 sample database, or you can import the tables them onto another database of your choice.
To import the Product and Inventory tables, change directories to setupDB in the downloaded attachment, where you will find the product.ixf file and the inventory.ixf file. Open a DB2 command window by entering db2cmd, and run the following commands:
import from product.ixf of ixf create into product import from inventory.ixf of ixf create into inventory
Begin creating the Event Publisher setup by setting up the MQ as follows:
- Create the WebSphere MQ queue manager QMEP on the source system svl-akrishni2 by issuing the command Crtmqm.exe QMEP.
- Start the queue manager by issuing the command Strmqm.exe QMEP.
- Create the rest of the MQ objects on the source system by issuing the
command runmqsc.exe QMEP < mqObjectsPublisher.in
The script mqObjectsPublisher.in is available under the RealTimeInt\setupEP directory of the download. This script creates all the MQ objects on the source system (queues, channel, and listener).
- Create the queue manager and the MQ objects on the target
(subscriber) system by entering the following:
Createmqm QMDS Strmqm QMDS Runmqsc QMDS < mqObjectsSubscriber.in
The script mqObjectsSubscriber.in is available under the RealTimeInt\setupEP directory of the download that is available with this tutorial.
Complete the following steps to set up the EP.
- Create the replication objects by running the following scripts using the
ASNCLP program (the command line replication admin tool) in a DB2 command
asnclp -f crtCtlTables.in
- Create all the control tables required for the publication
setup by entering
asnclp -f crtQMapAndPublications.in
The crtCtlTables.in and crtQMapAndPublications.in scripts are available under the RealTimeInt\setupEP directory of the download. These scripts create the queue map (a mapping of the MQ objects to Q replication objects) and the publications for the Product and the Inventory tables.
Start the EP (Q Capture program) by issuing the following command at a
DB2 command window:
asnqcap CAPTURE_SERVER=SALES CAPTURE_SCHEMA=ASN
This starts the Q Capture program, which starts publishing the changes in the Product and the Inventory tables. You can also use the startQCapture.bat script, provided in the RealTimeInt\setupEP directory of this tutorial's download.
In the DataStage job, you use a WebSphere MQ connector to read the messages from the receive queue and two transformer stages to parse the messages, as shown in Figure 2. The first transformer stage is used as a selection control to separate the messages related to a particular publication, such as a particular source table. The second transformer stage is used as a selection control to separate the message related to a particular operation, such as insert, update, or delete, for each table. The separated data, which corresponds to each row in the source table, is written to the respective data set (corresponding to the insert, update, or delete operation). Use the data set stage to write the processed data due, because data sets are native file formats that consist of a single header file referencing the actual data that can be partitioned into multiple DataStage parallel partitions. This format is therefore readily usable by DataStage to run in parallel environments, and therefore this format gives the best performance.
Figure 2. Event Publication DataStage job
The actual transformation processing that the main DataStage job performs can use these data sets as input. Typically the DataStage processing separates the data based on the row operation (insert, update, or delete). However, if there are referential constraints with the source data, the change data needs to be written to a single data set, instead of three different data sets.
The following sections describe in more detailed steps how to set up the DataStage job.
The following list shows the configuration of the MQ Connector:
- Connection properties
- Mode = server
- Queue manager = QMDS
- Username = db2inst1
- Usage properties
- Queue name = SALES_DATA_To_DATASTAGE
- Access mode = As in queue definition
- Wait time = 1
- Message quantity = -1
- Message read mode = Delete
Following are some notes about the configuration elements:
- Wait time
- Use this property to specify the maximum number of seconds to wait for a new message to arrive upon the input queue. The default value is -1, which specifies an indefinite amount of time. For the example, set it to 1 second.
- Message quantity
- Use this property to specify the number of messages (not rows) to retrieve from the input queue. For the example, set it to -1. A value of -1 specifies an indefinite number of messages. A value of 0 specifies no messages. You can specify integers between -1 and 999999999 for the message quantity. For the example, the message quantity is -1 and the wait time is 1 so that the MQ Connector can fetch all the messages that have arrived in that queue.
- Message read mode
- Use this property to specify how messages are read in the current transaction. For the example, set this to Delete (destructive read). You can also choose the Move to work queue option from the drop-down list if the messages need to be preserved.
Create a column called
PayLoad of type Varchar with size 1000, as shown in Figure 3. Because this is a text message, the size refers to
the number of characters. For the example, only the portion of the message that follows the message
header is important.
Figure 3. Configuration of the MQ Connector
Configure the SeparatePayLoad_Based_On_TableName transformer stage as shown in Figure 4.
Figure 4. Configuration of the first transformer stage
Note the following about the example configuration:
- Define a stage variable called SCHEMANAME, and set it to the fifth column of the input PayLoad.
- Define a stage variable called TABLENAME, and set it to the sixth column of the input PayLoad.
- In Figure 4, there are two output links from the SeparatePayLoad_Based_On_TableName transformer stage. Because each of these links goes to another transformer stage, use the stage variable SCHEMANAME and TABLENAME as a selection control to separate messages related to the Product and Inventory tables.
Configure the stage constraints, as shown in Figure 5.
Figure 5. First transformer stage constraints
Note that all non-numeric data in the messages are enclosed in double quotes in the messages. So when you do a comparison using the TABLENAME stage variable, enclose the actual table name (PRODUCT and INVENTORY tables) in an extra pair of double quotes.
This transformer stage separates the messages according to the source table name. The sample message is shown in Listing 2.
Listing 2. Sample message
10,"IBM","2009355","021255984000","ADMIN","PRODUCT","REPL","0000:0000:0000:0000:5ae5", "0000:87a0:c204:0000:0000","2009-12-21-10.12.52",,0000,"100-201-01","Ice Scraper, Windshield 4 inch",3.99,,,,,"100-201-01","Ice Scraper1 Windshield 4inch",3.99,,,," <productpid=""100-201-01""><description><name>Ice Scraper, Windshield 4 inch</name><details>Basic Ice Scraper 4 inches wide, foam handle</details><price>3.99</price></description></product>"
Note the following about the sample message:
- The stage variable SCHEMANAME is defined as
Field(Read_MQPayLoad.PayLoad, ",",5). The fifth column in a message is the table name.
- The stage variable TABLENAME is defined as
Field(Read_MQPayLoad.PayLoad, ",",6). The sixth column in a message is the table name.
- All non-numeric data in the messages are enclosed in double quotes in the messages. So when you define the SCHEMANAME or TABLENAME constraint, enclose the names in an extra pair of double quotes.
- The seventh column in a message is always the type of operation: ISRT, REPL, or DLET for insert, update, or delete, respectively.
- The source data starts from column 13.
After you separate the messages related to the PRODUCT and the INVENTORY tables, configure the transformer stages SeparatePayLoad_Based_On_I_U_D_Operation1 and SeparatePayLoad_Based_On_I_U_D_Operation2 to parse the row operations data (insert, update and delete). Figure 6 shows how the SeparatePayLoad_Based_On_I_U_D_Operation1 transformer stage is configured to parse the columns in the PRODUCT table.
Figure 6. Configuration of the second transformer stage
Note the following about the configuration of the second transformer stage:
- Define a stage variable called OPERATION, and set it to the seventh column of the input PayLoad.
- You have three output links from the SeparatePayLoad_Based_On_I_U_D_Operation1 transformer stage, as shown in Figure 2. Because each of these links goes to a Data Set stage, use the stage variable OPERATION as a selection control to separate messages related to insert, update, and delete operations.
- Define all the columns in the Product table, and map them to the individual columns in the PayLoad. For example, for the insert operation, map the columns PID, NAME, PRICE, PROMOPRICE, PROMOSTART, and PROMOEND to the columns in the PayLoad, starting at location 19.
- For the insert operation, the before values of all the columns in the Product table are empty, so the columns from 13 to 18 are empty.
- For the delete operation, the after values for all columns are empty, so the columns 19 and higher are empty.
- Update operations have both before and after values.
- Non-numeric column values are enclosed in double quotes in the MQ message.
Figure 7 shows the column mapping for the update operation
Figure 7. Column mapping for update operation
Note the following about the column mapping for the update operation:
- Figure 7 shows column mappings for update operation (operations keyword REPL).
- Locations 13 to 18 in PayLoad hold the before values, and locations 19 to 24 hold the after values.
Figure 8 shows how the operations data is separated in the Inventory table.
Figure 8. Separating the operations data in the Inventory table
Note that the configuration of SeparatePayLoad_Based_On_I_U_D_Operation2 transformer stage separates the operations data in the Inventory table.
Complete the insert, update and delete data sets for the Inventory table as described in this section. Figure 9 shows how to set the Data Set properties.
Figure 9. Data set properties
Note the following about setting Data Set properties:
- Figure 9 shows the Data Set stage properties for the data from the insert operation in the Product table.
- The update policy is set to Append. Make sure that the update policy for all the Data Sets is set to Append.
- Set the file property to the appropriate name with the full path.
- The file in the file property will be created if it does not exist. However the directories specified in the path should already exist in the file system.
Figure 10 shows the column configuration of Data Set stage Inserts_DataSet1.
Figure 10. Column configuration of the insert data set
Figure 11 shows the column configuration of the update data set.
Figure 11. Column configuration of the update data set
Note the following about the column configuration of the update data set:
- Figure 11 shows the column configuration of the Update_DataSet1 Data Set stage.
- Before and after image values are captured for update operations.
Figure 12 shows the column configuration of the delete data set
Figure 12. Column configuration of the deletes data set
Note the following about the delete data set:
- Figure 12 shows the column definitions for the Deletes_DataSet1.
- These are before image values in the Product table.
The following steps describe at a high-level what is needed to test the real-time configuration setup. The steps are described in more detail in the following sections.
- Start Event Publisher
- Import the DataStage job
- Compile the DataStage job
- Run the test script to introduce a change in the source data
- Run the DataStage job
- Verify the data in the data sets to show they were populated
Complete the following steps to start Event Publisher.
- Open a DB2 command window and cd to the RealTimeInt\setupEP directory from the download.
- Start the Event Publisher by running the
- Make sure that you get the ASN0572I message that the program initialized successfully, as shown in Figure 13.
Figure 13. Start Q Capture in DB2 cmd window
Complete the following steps to import the DataStage job.
- Start the DataStage Designer, and import the RealTimeDataInt.dsx job by browsing to the RealTimeInt directory from the download.
- Click OK in the DataStage Import window to complete the import of the job as shown in Figures 14 and 15.
Figure 14. Import DataStage job
Figure 15. More Import DataStage job
Complete the following steps to compile the DataStage job.
- If you used a different Q Manager or the receive queue name, change these values in the MQ Connector stage by double-clicking on the stage to get to the properties page.
- If you run the job on a Windows® platform, change the File properties of all the Data Set stages: Inserts_DataSet1, Updates_DataSet1, Deletes_DataSet1, Inserts_DataSet2, Updates_DataSet2, and Deletes_DataSet2.
- Compile the DataStage job by clicking Compile in the DataStage Designer menu, as shown in the Figure 16.
Figure 16. Compile the DataStage job
Complete the following steps to run the test script.
- Open a DB2 command window, and cd to the RealTimeInt\setupEP.
- Run the script
updateSourceTables.sql. This script does an insert, update, and delete on the Product and the Inventory tables.
Complete the following steps to run the DataStage job.
- Start the DataStage Director by clicking the All Programs > IBM Information Server > IBM WebSphere DataStage and QualityStage Director shortcut.
- Run the job by clicking the Run icon on the DataStage Director menu, as shown in Figure 17.
Figure 17. Run the DataStage job
View the job status log by right-clicking on the job and clicking View Log from the DataStage Director, as shown in Figure 18.
Figure 18. View job log
You can also verify that the job ran successfully from the Designer, which shows the color of the data flow in green with the number of rows processed between different stages. Figure 19 shows that there were 6 messages processed and one message each for each insert, update, and delete operation for the Product and the Inventory table.
Figure 19. Check job status
You can view the data set by right-clicking on the data set and clicking View data. Figure 20 shows the data in the Updates_DataSet1 for the Product table.
Figure 20. Updates data set
Similarly you can verify the propagated change data by viewing the data in each of the data sets.