Guaranteed delivery with InfoSphere DataStage
InfoSphere DataStage provides powerful capabilities to extract data from a source system, transform it, and load it into a target system. Many DataStage users require solutions that guarantee that data is moved from the source system to the target system without any possibility of data loss. This article describes in detail various approaches that you can use with InfoSphere DataStage to guarantee the delivery of data from a source to a target. It covers the approach of using the Distributed Transaction stage (DTS) with a transaction manager for distributed XA transactions and without a transaction manager using local database transactions. It describes using multiple input links with a database connector to perform multiple database operations within a local transaction. It discusses how guaranteed delivery is accomplished when moving data from InfoSphere Data Replication to DataStage using the Change Data Capture Transaction stage. Finally, it describes the best practice of combining messaging stages with database stages to provide reliable data processing and transformation services.
Global transactions with the DTS
This section explains the XA 2-phase commit architecture at a high level, and it shows how the DTS interacts with WebSphere® MQ and database resources to accomplish global transactions.
The X/Open Group standard eXtended Architecture (XA) defines a protocol for updating multiple resources within a single transaction. A resource can be a database or it can be a messaging system, such as WebSphere MQ. Achieving atomicity, consistency, isolation, and durability (ACID) across multiple resources connected via unreliable network connections is a difficult problem to solve. XA achieves this by using a two-phase commit (2PC) protocol. A transaction manager manages the protocol, communicating with multiple resource managers that manage the database or messaging resource transactions. The two phases of the 2PC protocol are:
- Commit-request: In this phase, the transaction manager sends request messages to each resource. The resources prepare the transaction to the point of commit, without actually committing the transaction. For example, a resource could write the data to the database, but flag each record as uncommitted. The resource then sends a status message back to the transaction manager to indicate success or failure.
- Commit: If all of the resources reported success from the first phase, then the resource manager sends a commit message to each resource. The resources complete the transaction and then report their final status back to the resource manager. If the resource manager receives any failure messages from any of the resources, it then sends an abort message to all of the resources, which rollback their uncommitted transactions.
2PC works because if a resource reports success from the first phase, then it is issuing a guarantee that it will commit the data. Underlying the protocol is a set of handshaking messages that handle a variety of cleanup situations, such as network failures or failure of any components involved in the transaction.
The DTS takes advantage of 2PC by using the WebSphere MQ transaction manager, which is a standard feature of WebSphere MQ Server, to coordinate the transaction with the resource managers as shown in Figure 1:
Figure 1. The relationship between DTS, MQ, and resource managers
DTS first connects to WebSphere MQ and instructs the transaction manager to start the XA transaction. The transaction manager communicates with the resource managers to start the transactions. DTS then utilizes the DB2, Oracle or WebSphere MQ connectors to perform write operations to the resource. Finally, DTS instructs the transaction manager to either commit or rollback the XA transaction.
DTS jobs typically use WebSphere MQ as the source of data, and databases as the target. By using DTS, delivery from the source queue to the target database is guaranteed: if the transaction is committed successfully, the source message is deleted from the source queue, and the data is written to the target database. If the job fails, nothing is written to the target database, and the message is retained on the source queue. This is guaranteed because the deletion of the source message is made within the same XA transaction as the updates to the target database, and so either both operations occur, or neither does.
DTS jobs often use work queues to partition the work into parallel pipelines. The use of work files is best explained with the aid of a diagram, shown in Figure 2:
Figure 2. The use of work queues by the DTS
This job works with WebSphere MQ to guarantee delivery in the following way:
- Under local sync-point control, the MQ Connector reads a message from the source queue performing a destructive read (that is, it will remove the message from the queue).
- The MQ Connector writes the message to the work queue. If running in parallel, there will be multiple work queues, one per parallel pipeline. The MQ Connector then commits the local transaction. Note that the use of a local transaction here means that the movement of the message from source queue to work queue is guaranteed. If it should fail, the deletion is rolled back, the message is restored on the source queue, and the job is aborted.
- The DTS stage writes updates to the target database.
- The DTS stage deletes the message from the work queue.
By using work queues, the MQ source connector is able to run in parallel. Each instance of this stage reads different messages from the source queue because it is reading in a destructive fashion, and WebSphere MQ ensures that only one of multiple readers reads any particular source message.
When the transaction is committed, if all goes well, the message is deleted from the work queue and the database updates are committed, all as part of a single XA transaction. Should any failure occur, such a database write error, then the XA transaction is rolled back, which undoes any changes to the target database and restores the MQ message to the work queue. If the job is subsequently restarted, the MQConnector first reads messages from the work queue and provides them to the job, and then continues reading from the source queue. This means that in the event of a failure, simply restarting the job allows it to continue to process messages from the point it left off.
Local transactions with the DTS
As well as working with XA-compliant resources such as DB2 and Oracle, DTS also works with non-XA-compliant resources. This includes ODBC and Teradata targets. This section explains how resources that do not support the XA architecture can still be utilized within DataStage jobs, and how guaranteed delivery is still possible. It also explains the pitfalls and benefits of this design pattern.
DTS works with non-XA resources by creating one or more "local" transactions in addition to the XA transaction. In this context, a resource is uniquely defined by the connection properties, such as the database and username used to access the database. If there are multiple links that access the same resource (that is, the same database, with the same user credentials), then a single transaction is used across those links. At commit time, the local transaction is committed before the XA transaction. This local commit either succeeds, which is the most likely case, or fails, which results in the transaction being rolled back by the resource. After the local transaction has been committed or rolled back, then the XA transaction is committed or rolled back. The possible commit or rollback scenarios when combining a local transaction with the XA transaction include the following:
- The local commit succeeds, then the XA commit succeeds (the normal case).
- The local commit succeeds, but the XA commit fails (very unlikely).
- The local commit fails. When this occurs, DTS rolls back the XA transaction. If DTS should crash or some other system failure occurs before rolling back the XA transaction, then MQ transaction manager automatically rolls it back anyway.
And for rollback:
- The local rollback succeeds, then the XA rollback occurs.
- The local rollback fails. This scenario is not actually possible; resources assume a rollback position unless they are asked to commit and can successfully follow through on that commit request.
Of these scenarios, the only one of concern is the second one, where the local transaction commit succeeds but the subsequent XA commit fails. In this case, the worst that happens is that the data is written to the target database under the local transaction, but the message remains on the source queue. If this should occur, as long as the job is idempotent, then the job can simply be run again. In none of the possible scenarios previously described is data ever lost. The following section explains how to create an idempotent job that can be restarted upon job failure.
Idempotency and DataStage jobs
This section explains how to write jobs so that they offer idempotent behavior; that is, they can be restarted upon failure without compromising data. Such jobs need to be able to accept data that they may have already processed, and either process it again or ignore it. To help explain this, consider a job that is not idempotent. Consider a simple file to Oracle connector job, shown in Figure 3:
Figure 3. A simple file to Oracle connector job
with the properties of the Oracle connector specified as shown in Figure 4:
Figure 4. Oracle connector properties
Assume that table TABLE_WITH_PK has a unique constraint, meaning that it cannot have two rows with the same key value. It is clear to see that if this job runs a second time, it will fail because the same data will be 'replayed' to the Oracle connector, which will lead to duplicate key row errors. There are a number of possible ways to overcome this:
- Change the Write mode property to one of Insert new rows only, Insert then update, or Update then insert. The first option ignores rows that already exist in the database, the second executes an insert on the row, and if this returns row exists, then executes an update statement with the same data. The latter option is similar but executes the statement in the reverse order.
- Add a reject link to the target stage and configure it so that records that cause row errors are sent to the reject link.
- Use a sparse lookup stage to see whether a record already exists in the target. The results of the lookup can be used to determine whether to send records to the target stage, to ignore them, or to divert existing records to a log file, or other target.
Which of these approaches to use depends on the particular use case and whether processing is required on the source data. If transformations or other functions are required on the source data, it may be more efficient to determine that a record already exists by using a sparse lookup stage early in the job, such that the transformation can be skipped. If the source data is largely just passed straight through to the target, through a small number of intermediate stages, it may be more efficient to allow the data to reach the target connector and have that connector reject or ignore the data.
Other ways to achieve idempotency can be accomplished by using additional target tables to store the transactional state. To do so requires that a target stage can write to multiple target tables as part of a single transaction. The database connectors have such a capability, and this ability is described in the following section.
Local transactions with database connectors
Since Information Server version 8.5, database connectors have provided the ability to support multiple input links where each link targets a particular database table and has its own set of properties, such as the write mode. These multiple links are all executed within the same database transaction, so the ACID capabilities required for guaranteed delivery are maintained: either all of the table updates are written, or none of them are.
There are many use cases for such functionality, including:
- Writing related records, such as maintaining parent-child relationships within a single transaction.
- Using different write modes for each link, where each link is configured to update the same table. For example, link 1 may perform deletions from a table, while link 2 performs updates to the same table.
- Storing a transactional marker to hold the state of the transaction.
This latter approach is often used in checkpoint-restart scenarios to avoid resending the same data to the target database in the event that a job has to be restarted due to some failure condition. One way to accomplish this is to utilize a target table to store the number of the last processed row. Because this is committed in the same transaction as the target updates, it is guaranteed to precisely reflect the last processed row count. Early in the same job, a lookup stage reads this last processed value and passes it to a transformer stage that contains a constraint expression to compare the row number to the count. This stage discards any rows whose row count is less than the last processed value, because these have already been processed by the target stage. The result is that only those records that have not yet been processed by the target stage are delivered to that stage.
One more stage is required, which is a Wave Generator stage. When configured with multiple input links, database connectors only commit the transaction at the end of each wave. The Wave Generator offers a number of different ways of determining when to issue a wave marker. For this case, an absolute row count would suffice, such that the target connector commits the transaction every N rows.
The complete job looks like Figure 5:
Figure 5. Checkpoint-restart job
A job such as the one in Figure 5 can be the solution to the idempotency problem, and guarantees the delivery of data from source to target.
Other design patterns
This section explains some other patterns that can achieve guaranteed delivery with other resources.
Guaranteed delivery with InfoSphere Data Replication
The Change Data Capture Transactional stage (CDCTS) works with InfoSphere Data Replication (DR) to replicate and process mission-critical data events in real time. DR uses a log-based capture method to detect updates to various database systems and can then either replicate these updates to another database system, or it can convey these updates to files, WebSphere MQ messages, or to the CDCTS. The CDCTS communicates with DR over TCP/IP to receive these updates to the source database, and also to pass status messages.
Figure 6 shows a simple DR DataStage job:
Figure 6. Data replication job
The CDC stage is configured with the name of the DR subscription and must have at least two output links: one or more of these links carry the database updates, and exactly one link passes the bookmark. The bookmark is similar to the last processed count described in the previous section. It is used to identify the last committed point of the current transaction. A CDCTS job uses a target database connector, which has at least two input links. One of these links carries the bookmark value, which targets a simple table, named the bookmark table that is created to store the bookmark value. As with the prior example, the target database connector commits the transaction at the end of transactional waves. The boundaries of these transactional waves are determined by DR. DR sends commit messages to CDCTS that align with the transaction boundaries of the updates to the source database. When CDCTS receives these commit messages, it sends out transactional wave markers to its output links. When the target database stage receives the wave markers on each link, it commits the transaction. Because a single transaction encompasses both the bookmark table and database updates, the stored bookmark value is guaranteed to be in sync with the database updates.
As the job runs, DR periodically asks the CDCTS to report the last-committed bookmark value. The CDCTS queries the bookmark table via an ODBC connection and reports the bookmark back to DR. DR uses this to clean up logs because it can be certain that a particular transaction has been successfully written to the target table.
In the event of a job, network, or system failure, the job is left in an incomplete state. When the job is restarted, the CDCTS reports to DR the last-committed bookmark value, and DR then uses that information to determine which record to start from.
Once more, delivery from DR to the target database is guaranteed, and by the use of the bookmark mechanism, there are never duplicate records sent to the target database.
Guaranteed delivery with pipelined stages
There are other creative ways that jobs can be written to ensure guaranteed delivery. Most of these rely on the use of transactional wave markers, as seen in the previous examples. Another such example is a case where data must be moved from a source queue to a target database, but the messaging system is not WebSphere MQ. DTS only supports WebSphere MQ, so to achieve similar functionality with other messaging systems, such as Java Messaging System (JMS) requires a job such as the one in Figure 7:
Figure 7. Pipeline job
The SourceMessage and DeleteMessage stages use a JMS solution coded on top of the Java Integration Stage. This JMS stage also outputs wave markers periodically. The solution works, because the Teradata connector provides the ability to send successful records to its reject link (shown in Figure 8), if configured to do so from the reject link properties:
Figure 8. Teradata connector reject properties
The sequence of events within the job execution is:
- The JMS stage reads the source message, but leaves it on the source queue. This stage also emits end-of-wave markers at prescribed intervals.
- The data from JMS is written to the Teradata database by the Teradata connector. If the write is successful, then the connector forwards the data to its reject link, which is configured to forward only successful records.
- When the Teradata connector receives the end-of-wave marker from the JMS source stage, it commits the transaction and forwards the end-of-wave marker to its output link.
- When the Remove Duplicates stage receives the end-of-wave marker, it removes duplicates and then sends its results, including an end-of-wave marker to its output.
- The final JMS stage deletes the source message when it receives an end-of-wave marker.
Notice that the previous solution would not work correctly if there was no end-of-wave marker because the Teradata sends records its output link after the data is written, not after it is committed. But by using wave markers and including the Remove Duplicates stage, the sequence is guaranteed.
There is a possibility that after the database transaction has been committed, the job fails. In this event, the final deletion of the source message does not occur, and the message is left on the source queue. No data is ever lost, but such a failure would mean that if the job is restarted, the Teradata connector sees the same data that it has already processed. For this reason, as with the use of local transactions with DTS, the job design needs to be idempotent. With a correctly constructed idempotent job, if the job is aborted for any reason, simply restarting it allows it to continue from where it left off.
This article describes a number of approaches and methodologies that can be used to guarantee delivery of data from a source system to a target system via DataStage jobs. There is not a single solution that works best in all circumstances due to the different nature of the resources being accessed, so a combination of techniques is required. Factors that determine which approach to use include whether or not the resource supports XA transactions, the size of the transactional data, latency requirements, and restartability requirements.
The author sincerely thanks his colleagues Tony Curcio and Ernie Ostic for reviewing and providing their valuable feedback that helped to refine this article.
- Learn how to integrate InfoSphere Change Data Capture and DataStage.
- Evaluate IBM products in the way that suits you best: Download a product trial, try a product online, use a product in a cloud environment, or spend a few hours in the SOA Sandbox learning how to implement Service Oriented Architecture efficiently.