High-performance solution to feeding a data warehouse with real-time data, Part 1: Integrate InfoSphere Replication Server and InfoSphere DataStage

Trickle feed your data warehouse

Feeding a data warehouse with changes from the source database can be very expensive. If the extraction is only done with SQL, there is no way to easily identify the rows that have been changed. IBM® InfoSphere® Replication Server can detect changed data by reading only the database log. This article series illustrates how you can use InfoSphere Replication Server to efficiently extract just the changed data, and then pass the changes to IBM InfoSphere DataStage® to feed the data warehouse. This is a two-part article series. In Part 1, get an overview of the products and how they can work together. In Part 2, explore two different integration options: with staging tables or with MQ messages.

Anand Krishniyer (akrishni@us.ibm.com), Staff Software Engineer, InfoSphere Replication, IBM

Anand Krishniyer photoAnand Krishniyer is a staff engineer with the InfoSphere Replication development organization. As a member of the admin team, his responsibilities include developing line items as well as providing technical assistance to customers with respect to installation, setup, and configuration of the Replication Server. Prior to his current role, Anand worked as a project lead for the Integration and Tools team for a process management company, Savvion (now part of Progress Software).



Tony Lee (tonylee@us.ibm.com ), Senior Certified IT Specialist, InfoSphere Replication, IBM

Tony Lee photoTony Lee is a certified senior IT specialist in the InfoSphere Replication Center of Competency (COC), a team within the InfoSphere Replication development organization. As a member of the COC, Tony has provided technical assistance to customers and business partners on InfoSphere replication technologies. Tony has many years of consulting experience with various IBM replication technologies, covering SQL replication, Q replication, and, most recently, InfoSphere Change Data Capture. Prior to his current role, Tony provided Information Management technical consultation to customers and partners for nearly a decade, covering a wide range of topics, from DB2 tuning, to Information Server and Master Data Management. Prior to becoming a consultant, Tony worked in many different roles in the Information Management area, ranging from management to development.



James Yau (jamesyau@us.ibm.com), Technical Solution Architect, InfoSphere Information Server, IBM

James Yau photoJames Yau is a senior solution architect certified on the InfoSphere Information Server DataStage product. Currently, he is part of the InfoSphere Technology Enablement organization responsible for Information Server Boot Camp content development and delivery. James has many years of consulting experience with the Information Server Suite of products, including InfoSphere DataStage, QualityStage, Information Analyzer, and FastTrack. Prior to his current role, James was part of a Business Partner Technical Enablement team, in which he was the technical program manager for InfoSphere Information Server. His role included course content development and delivery with various delivery vehicles, such as instructor-led, on-line, and self-paced learning. In the past, James worked in many different roles, ranging from software developer to marketing manager, both in IBM and outside of IBM.



29 July 2010

Also available in Chinese Spanish

Introduction

In today's fast-paced world, the success or failure of a business is often affected by how quickly it can react to change. Whether it means a retailer can dynamically respond to changes in inventory levels or a financial institution reacts to a change in the interest rate, the business that can detect and respond the quickest to change will have a competitive advantage. The traditional approach of business intelligence (BI), where transactional data is gathered, transformed, and then bulk-loaded into a data warehouse on a periodic basis (say, daily) is no longer sufficient to meet the real-time demand of an on-demand business. Furthermore, as the amount of transactional data increases, the batch window to perform a traditional load of a data warehouse is shortened, making the batch process less viable for real-time data requirements.

The basic approach to feeding a data warehouse typically consists of three phases: E (Extract), T (Transform), and L (Load).

Extract
Read the source data from various transactional data sources, such as IBM DB2®, Oracle, or flat files. The traditional approach would often require reading an entire transactional database with predicate to filter out only rows of interest since the last time the feed was done. This means for a transactional database of 100GB, if only 5% of the data (5GB) was changed since last load, the extract phase will still need to read all 100GB of data in order to extract the 5GB of changed data.
Transform
Transform the transactional data, which is often highly denormalized, into the warehouse format usable for BI analysis, such as a star schema. This can include looking up codes, joining data across transactional systems, data cleansing, converting data into dimensional data, and more. (See Resources for data warehouse IBM Redbooks® publications.)
Load
After the data has been transformed, populate the warehouse, typically with either SQL or the native import/load utility of the warehouse database.

In this article, consider how using InfoSphere Replication Server can drastically reduce the time it takes during the extract phase and, by integrating the output from the Replication Server with InfoSphere Information Server (the main component used here is DataStage), to perform the transform and load phases. The entire ETL process can be significantly shortened to the point where real-time BI can be realized.


Technology overview

The two key technologies discussed in this article are InfoSphere Replication Server and InfoSphere DataStage (a component of InfoSphere Information Server). InfoSphere Replication Server provides the extract function; InfoSphere DataStage provides the transform and load functions.

InfoSphere Replication Server

The key to achieving high data extract performance with InfoSphere Replication Server is the fact that all major relational databases use a log to record changes to the database. For example, if a record is inserted into a table, a log record storing the inserted data is written to the database log so it can be used for recovery purposed. Similarly, updates and deletes are also logged with sufficient data needed for recovery purposes. InfoSphere Replication Server understands how to read the log to pick up the changes in the database (it supports native log read of DB2 and Oracle today). Therefore, instead of scanning an entire database (a 100GB database, for example) to find the inserted/updated/deleted records, InfoSphere Replication Server only needs to read the log data that was changed (5GB of log data if only 5% was changed, to follow the same example).

Main components of InfoSphere Replication Server architecture:

  • Capture: This component is responsible for reading the database log and capturing the changes. This component typically runs on the same machine where the source database resides, although for some databases, it can also run remotely from the source database.
  • Apply: This component is responsible for applying the changes captured by the capture component and then writing the changes to a target database. Various transformations can be done (such as subset rows and columns). Multiple target table types are supported (for example, the target can be a table that looks just like the source table), or you can specify a target table to insert a new row for every change ( for example, instead of deleting the target row when the source row is deleted, insert a new row in the target indicating this is a "delete" recording the deleted data in the new row).
  • Admin: This component provides an interface to configure and administer InfoSphere Replication Server. Both a UI interface and a command-line interface are available.

Architectural options:

  • SQL replication: This was the original replication architecture. SQL Capture uses staging tables, called Changed Data (CD) table, to store the captured changes it reads from the log. SQL Apply would issue SQL to read the CD tables, and then issue SQL to write the changes to the target tables. While it is slower in performance, SQL Replication works very well in some scenarios, such as distribution of source data to large number of targets.

    Figure 1 shows Capture using CD tables to store captured changes from the logs, Apply reading those tables, and then writing the changes to the target tables.

    Figure 1. InfoSphere SQL replication architecture
    Capture using CD tables to store captured changes from the logs, Apply reading those tables, and then writing the changes to the target tables
  • Q replication: This is the newer architecture, where instead of Q Capture writing the captured data into CD tables, it writes the captured data into WebSphere MQ queues. The MQ queue then becomes the transport mechanism, where Q Apply, running at the target machine and reading from the receive end of the queue, would replay the transaction and write the changes to the targets. Q Replication is therefore real-time data replication, with much higher performance, sub-second latency, and extremely high throughput.

    Figure 2 shows Q Capture getting data from the logs, writing it into MQ queues, Q Apply reading from the queue, and writing the changes to the target tables.

    Figure 2. InfoSphere Q Replication architecture
    Q Capture getting data from the logs, writing it into MQ queues, Q Apply reading from the queue, and writing the changes to the target tables
  • Event publishing: Event publishing is essentially running Q Capture without Q Apply. In other words, instead of running Q Apply to read the receive queue for changes, the captured changes are written by Q Capture in a documented format (either XML or CSV, comma-separated values) so you can write your own program to read the output directly from the receive queue. This article later discusses how this is a viable option for integration with InfoSphere DataStage, as InfoSphere DataStage can directly consume the changes from the MQ receive queue and bypass the Q Apply overhead.

    Figure 3 shows a user app reading from the MQ queue and writing to a user file or user table.

    Figure 3. InfoSphere event publishing architecture
    User app reading from the MQ queue and writing to a user file or user table

See the Resources section for more information related to InfoSphere Replication Server.

InfoSphere DataStage

InfoSphere DataStage is a powerful ETL tool that allows you to graphically construct a job to perform ETL functions without writing any codes. The product is packaged as part of the InfoSphere Information Server product. InfoSphere DataStage comes with dozens of built-in transformation functions called stages. For example, there is a stage to read from a database table, a stage to join data, a stage to transform input data, a stage to clean data, and more. You can graphically drag stages from the palette to the designer canvas, and then connect the output from one stage to become input of another stage. Each stage provides customizable properties (for example, input table name, column definition, transformation formula, and so on). A job is then compiled, which generates an executable job in the InfoSphere DataStage proprietary language. When the job is executed, data is extracted, transformed, and loaded per the definitions of the stages. One of the most significant features of InfoSphere DataStage is that it is built on a highly configurable parallel engine, together with a proprietary parallel file structure. These parallel features enable DataStage to achieve very high performance.

Figure 4 shows an example InfoSphere DataStage job on the InfoSphere DataStage Designer client.

Figure 4. A sample InfoSphere DataStage job shown in the InfoSphere DataStage Designer
Screen capture graphically shows the movement of data through the stages in the designer

In addition to supporting data flow design, InfoSphere DataStage also supports visual workflow design, as shown in Figure 5:

Figure 5. A sample InfoSphere DataStage workflow sequence
Screen capture of sample InfoSphere DataStage workflow sequence

See the InfoSphere DataStage reference in the Resources section for more details.


InfoSphere Replication Server integration with InfoSphere DataStage

Overview

While InfoSphere DataStage has stages for accessing data from different databases, it does not have the capability to capture only changed data from reading the database logs. InfoSphere Replication Server, on the other hand, has the ability to capture only changes from the database, but it does not have the robust transformation capabilities of InfoSphere DataStage. Let's explore the various integration options between these two technologies.

Fundamentally there are two architectural approaches to integrate InfoSphere Replication Server with InfoSphere DataStage:

  • Real time: The changed data is consumed directly by InfoSphere DataStage with no intermediate staging.
  • Staged data: The changed data is landed, either as a file, or in a relational table. InfoSphere DataStage then reads that landed data.

InfoSphere Replication Server can integrate with InfoSphere DataStage with either of these architectures.

Real-time integration

With real-time integration, the changes made in the source tables are captured by the Q Capture component of InfoSphere Replication Server, which are then written to an MQ queue. However, instead of using Q Apply to read the changes from the MQ queue at the target and then land the changes in a staging table, you simply let the InfoSphere DataStage job directly read from the MQ queue and process the changes using the event publishing feature of InfoSphere Replication Server.

Event publishing supports two MQ message formats—XML or comma-separated values (CSV). While XML is highly portable and flexible, CSV gives better performance. InfoSphere DataStage has an MQ Connector stage, which can be used in a job to read the messages off of the queue. The job can then parse the message based on the chosen message format and perform the necessary transformations. (Part 2 of this series walks you through a detailed example.)

As illustrated in Figure 6, Q Capture reads logs and sends changes to the MQ queue. Data is then sent to target DB (ODS) where it is parsed, written to data sets, transformed, and sent to the warehouse DB.

Figure 6. Real-time integration details
Q Capture reads logs and sends changes to the MQ queue. Data is then sent to target DB (ODS) where it is parsed, written to data sets, transformed, and sent to the warehouse DB.

See the Resources section for more information related to InfoSphere Replication Server.

Real-time integration gives the best performance, as it avoids I/O and processing overhead of landing the changed data. However, as Q Capture will continually push changes to MQ, the InfoSphere DataStage transform job will need to be able to consume and transform the captured changes sufficiently fast, so as not to fall behind Q Capture. One of the advantages of using MQ as the transport mechanism by Q replication is that MQ can buffer the changes if the InfoSphere DataStage job is unable to keep up. But obviously the job must eventually catch up. Real-time integration may not be the best integration option if the transformation needed involves dealing with a larger amount of data than what's available in real time (for example, in the aggregation of a group), in which case the transformation job may still need to stage the data independent of the replication technique. In that case, the "staged data" integration option may be the right solution.

Staged data integration

Instead of letting the InfoSphere DataStage transform job consume the changes directly off of the MQ queue, the staged data approach writes the changes to a target table, which in turn is read by the InfoSphere DataStage transform job. This staging table can reside, for example, at an ODS (Operational Data Store), where other data needed for the transform job may also be gathered. This staging table is called a Consistent Changed Data (CCD) table in InfoSphere Replication Server. (For those familiar with SQL replication Capture, the CCD target table is similar to the CD table at the source, which can actually be used as well if the InfoSphere DataStage transform job can be run at the source server where the CD table resides, bypassing the need to run SQL Apply.) It's true for either the CD or the CCD table that changes from the log are always directly inserted into the staging table with a column to denote whether the operation in the log was an insert, update, or delete. This information can therefore be readily used by the InfoSphere DataStage job to perform the transformation.

Figure 7 shows a sample CCD table containing the logmarker and the operation columns.

Figure 7. A snapshot of a sample CCD table
Screen shot of a sample CCD table, showing the logmarker and the operation columns

Let's take a closer look at some of the columns in Figure 7.

  • IBMSNAP_INTENTSEQ: This is an ever-increasing LSN (Log Sequence Number) that uniquely identifies every log record. This information can be used by replication to determine whether a log record has been processed or not.
  • IBMSNAP_COMMITSEQ: This LSN is unique for every transaction. In this example, there are four transactions with five changes: the last two updates belong to the same transaction (x'…1101213D'). InfoSphere Replication Server guarantees transactional consistency in the target. Therefore, if a transaction has multiple operations, either all the rows corresponding to the operations will be written to the CCD, or none will appear.
  • IBMSNAP_OPERATION: This identifies whether this record is an insert, update, or delete in the source. This information can be used within the InfoSphere DataStage job to determine what transformation should be done. For example, if the value is a 'D,' write the information to an audit file, recording this row has been deleted, but do not delete the data from the warehouse.

With the staged data in the CCD, the InfoSphere DataStage job can be scheduled periodically (say hourly) to read the CCD and perform the transform, now on a much larger set of data than is possible with the real-time EP integration. However, because Apply is now inserting into the CCD table with new rows asynchronously to the transform job reading the CCD, you need to ensure you can track which rows in the CCD have already been processed, as opposed to those that have newly arrived. The technique to track changes differs between SQL replication and Q replication.

SQL replication

With SQL replication, tables to be replicated are defined by a "subscription set" used by SQL Apply. Data is only replicated by SQL Apply when a subscription set periodically "wakes up" and performs a single cycle of replication processing. A subscription set can be configured to wake up either as a result of an event signal or timer interval-driven (anywhere from a minute to hours). This means during the interval when a subscription set is dormant, the contents of the CCD table is static. If you configure the subscription set to only perform a subscription cycle when the InfoSphere DataStage job is not running, you can do the following in the InfoSphere DataStage job (note this job will be scheduled to run periodically):

  1. Read the entire CCD table
  2. Perform the transformation on the data
  3. Delete the entire content of the CCD table
  4. Issue an event signal to wake up the subscription set so it can populate the CCD table with new data

Alternatively, the subscription set can be scheduled to wake up at a different time from the time the InfoSphere DataStage job is scheduled to run. Of course the danger here is, if for some reason both the InfoSphere DataStage job and the replication cycle run at the same time, you can get incorrect results.

Figure 8. Staged data integration architecture, including subscription cycle
Staged data integration architecture, including subscription cycle

(Part 2 of this series walks you through a detailed example.)

Q replication

Q Apply, unlike SQL Apply, runs continuously. You, therefore, don't have the option of scheduling the InfoSphere DataStage job to run at a time when the CCD table is not being changed by Apply. You alternatively have three options:

  • Run Q Apply with optionAPPLYUPTO=CURRENT TIMESTAMP(new option in 97FP2): The APPLYUPTO parameter is a Q Apply option that tells Q Apply to only apply the changes to the CCD up to a specific time, and then stop.

    The following steps occur within the InfoSphere DataStage job:

    1. Read the entire CCD table.
    2. Perform the transformation on the data.
    3. Delete the entire content of the CCD table.
    4. Start Q Apply with APPLYUPTO=CURRENT TIMESTAMP so that Q Apply can populate the CCD table with changes that have been captured since the last time Q Apply was run.
    5. Wait for Q Apply to complete.
    6. Iterate from Step 1.

    Q Apply therefore only runs once every time the transform job has finished and populates the CCD table with new changes to be processed at the next InfoSphere DataStage job cycle. Note this technique is only feasible if the cycle between running the InfoSphere DataStage job is sufficiently small, so the changes accumulated during this time can be buffered by MQ (as Q Apply will not be running).

  • Stop/Start Q Apply: This is similar to the last option, except, instead of only running Q Apply once after every InfoSphere DataStage job cycle, you keep Q Apply running, though it is stopped during the transform job.
    1. Stop Q Apply.
    2. Read the entire CCD table.
    3. Perform the transformation on the data.
    4. Delete the entire content of the CCD table.
    5. Start Q Apply.

    This avoids the problem of Q Apply being down during the InfoSphere DataStage job cycle. Q Apply will be continually running regardless of the time between the InfoSphere DataStage job cycles.

  • Track CCD low/high watermarks: Store the minimum and maximum IBMSNAP_COMMITSEQ values at the beginning of the InfoSphere DataStage job, read only the contents of the CCD between these min and max values, and when the job completes, update these min/max watermarks to read the next set of changes. Q Apply never stops. During InfoSphere DataStage transformation processing, the CCD table will continue to accumulate changes, but they will not impact the InfoSphere DataStage job processing, as the new records will be outside of the processing range.

    The one complexity with this solution is InfoSphere DataStage (parallel edition, which is InfoSphere Information Server) does not support global variables as a way to track these min/max values. In order to persist these values, you have to either write them to a flat file or to a relational table. With a relational table, the value can be used in a DB2 Connector stage as a predicate to read the CCD table. With a flat file, however, as the flat file values cannot be used within a DB2 Connector SQL statement (restriction with the DB2 Connector stage), you need to use a different technique to read the CCD table with the predicate. One possibility is to invoke the DataStage using dsjob and pass the synchpoint value as a run time parameter within.

    1. Set GLOBAL_MINSYNCHPT=MIN(IBMSNAP_COMMITSEQ)-1 from CCD table (initialization only).
    2. Set GLOBAL_MAXSYNCHPT=MAX(IBMSNAP_COMMITSEQ) from CCD table.
    3. Read the CCD table for every row with IBMSNAP_COMMITSEQ > GLOBAL_MINSYNCHPT and <=GLOBAL_MAXSYNCHPT.
    4. Perform the transformation on the data read.
    5. Set GLOBAL_MINSYNCHPT=GLOBAL_MAXSYNCHPT.
    6. Repeat at Step 2.

    Note: Unlike the other options, this does not delete the contents of the CCD table as part of running the InfoSphere DataStage job. Consequently, the pruning of the CCD table is a separate process outside of the scope of InfoSphere DataStage. This of course has the advantage that the CCD can now be used for other purposes (such as auditing).

Figure 9. Real-time integration architecture with data flowing to the warehouse, both with and without ODS
Shows data flowing to the warehouse both from ODS and without ODS
Table 1. Comparison of integration options
ConsiderationsReal timeStaged data
Event publishingSQL replicationQ replication
Start/stop Q ApplyTrack positions in CCD
Latency (time it takes for change happens on source to reach the InfoSphere DataStage job)Fastest, continuous replication with no Q Apply overhead. The InfoSphere DataStage job, however, must perform fast enough to consume the changes. If event-driven, changes are replicated only after completion of an InfoSphere DataStage job processing cycle when the event is fired. If time interval-driven, changes are replicated after every specified time interval. Changes replicated continuously by Q Apply, except during InfoSphere DataStage job processing. Changes replicated continuously by Q Apply.
Where changes are buffered when InfoSphere DataStage job has not processed the changesIn MQ QueueIn CCD tableIn CCD tableIn CCD table
Admin overhead1. MQ monitor - ensure MQ queue does not overflow
2. Q Capture monitor
1. SQL Capture monitor
2. SQL Apply monitor
1. Q Capture monitor
2. MQ monitor
3. Q Apply monitor
1. Q Capture monitor
2. MQ monitor
3. Q Apply monitor
Changed data usable by other applicationsIf InfoSphere DataStage uses non-destructive read of the queue, another application can read the queue.No. CCD data is deleted after DS job processing. Can create separate subscription for other applications.No. CCD data is deleted after InfoSphere DataStage job processing. Can create separate subscription for other applications.Yes. CCD data not deleted after InfoSphere DataStage job processing.
Setup considerationsRequire mapping of MQ message to column definitionsUsual SQL replication setup considerationsUsual Q Replication setup considerationsRequire manual pruning of CCD table

Conclusion

This article discussed the technologies of the InfoSphere Replication Server and InfoSphere DataStage products and the different ways of integrating the two to feed the data warehouse. It also provided pros and cons for the various integration options. Part 2 of this series will detail two specific integration options: with staging tables or with MQ messages. It will also cover the setup and configuration, with screen shots and step-by-step instructions.

Resources

Learn

Get products and technologies

  • Build your next development project with IBM trial software, available for download directly from developerWorks.

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Information management on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Information Management
ArticleID=502729
ArticleTitle=High-performance solution to feeding a data warehouse with real-time data, Part 1: Integrate InfoSphere Replication Server and InfoSphere DataStage
publish-date=07292010