CONNX CDC
The CONNX CDC origin reads mainframe change data provided by the CONNX data synchronization tool, DataSync. To read mainframe data from a CONNX server using a SQL query, use the CONNX origin. Connecting to CONNX requires a Mainframe Collector license. The license allows reading mainframe data from CONNX and writing the data to cloud destinations. Contact your IBM StreamSets account manager for more information. For information about supported versions, see Supported Systems and Versions in the Data Collector documentation.
The CONNX CDC origin includes the CRUD operation type in a record header attribute so generated records can be easily processed by CRUD-enabled destinations. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.
When you configure the CONNX CDC origin, you specify connection information and any custom JDBC configuration properties to determine how the origin connects to CONNX. You can also use a connection to configure the origin.
You specify the DataSync transformation that defines the change data to process. When the source database has high-precision timestamps, such as IBM Db2 TIMESTAMP(9) fields, you can configure the origin to write strings rather than datetime values to maintain the precision.
You can specify custom properties that your driver requires and configure connection properties. And you can specify what the origin does when encountering an unsupported data type. By default, the origin generates JDBC record header and field attributes that provide additional information about each record and field.
When a pipeline stops, the CONNX CDC origin notes where it stops reading. When the pipeline starts again, the origin continues processing from where it stopped by default. You can reset the origin to process all requested data.
The origin can generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.
Prerequisites
Connecting to CONNX requires a Mainframe Collector license. The license allows reading mainframe data from CONNX and writing the data to cloud destinations. Contact your IBM StreamSets account manager for more information.
Before using the CONNX CDC origin, you must install the CONNX driver.
Before you build a CONNX pipeline, you must install the CONNX SQL engine local to the mainframe machine. For information about installing a CONNX SQL engine, see the CONNX documentation.
DataSync Transformation
The CONNX CDC origin processes change data provided by CONNX DataSync. To process change data, you must create a DataSync transformation for change data, and configure the transformation to manage changes for the data that you want to process.
For more information about creating a DataSync change data capture synchronization transformation, see the CONNX documentation.
Installing the CONNX JDBC Driver
Before using the CONNX CDC origin, install the CONNX JDBC driver for the server. The origin cannot access the server until you install this driver.
For information about installing additional drivers, see Install External Libraries in the Data Collector documentation.
- Download the CONNX JDBC driver from the CONNX website.
-
Install the driver as an external library into the CONNX stage library,
streamsets-datacollector-connx-lib
, which includes the origin.
Supported Destinations
The destinations that you can use in a CONNX pipeline depend on your Mainframe Collector license. For more information, see Prerequisites.
- Amazon S3
- Azure Data Lake Storage Gen2
- Azure Synapse
- Databricks Delta Lake
- JDBC Producer to write to Amazon Redshift
- Google BigQuery
- Google Cloud Storage
- Snowflake
Processing CONNX CDC Data
The CONNX CDC origin works with a CONNX DataSync transformation to process CONNX change data. At a high level, DataSync uses snapshots, finalize instructions, and savepoints to perform change data capture synchronization.
DataSync stores snapshots of the state of specified tables. The finalize instruction indicates the last snapshot that was processed. The savepoint indicates the snapshot up to which you want data. So when a savepoint is created, DataSync reviews the finalize and savepoint snapshots and provides the data needed to get from the finalize snapshot to the savepoint snapshot, populating insert, update, and delete tables with the required updates.
When you start a pipeline, the CONNX CDC origin generates a DataSync savepoint. DataSync then populates the insert, update, and delete tables with the data required to get from the last finalize point to the savepoint. The CONNX CDC origin processes the data and passes it to downstream pipeline stages. After all data in the insert, update, and delete tables is processed, the CONNX CDC origin generates finalize instructions for DataSync, which creates a finalize snapshot that indicates the data that was processed.
Recovery Handling
When a CONNX CDC pipeline comes to an unexpected stop, you might need to review and remove some of the generated data from the pipeline destinations to avoid duplicate or inaccurate data.
Under normal conditions, when a CONNX CDC pipeline stops, the origin generates finalize instructions to indicate where processing ended. If the pipeline stops unexpectedly, such as a sudden shutdown of the Data Collector machine, the origin is unable to complete processing as expected or to generate the appropriate finalize instructions.
When the pipeline restarts, the origin creates a new savepoint, and DataSync reviews the finalize and savepoint snapshots, as usual, and provides the required data to get from the finalize snapshot to the savepoint snapshot. However, some of that data was already processed before the pipeline stopped unexpectedly.
As a result, after an unexpected pipeline stop, best practice is to check Data Collector logs for the timestamp of the last generated finalize instructions. Then review the data written to destination systems after that timestamp, and remove it if needed, before starting the pipeline again.
Record Attributes
The CONNX CDC origin includes the sdc.operation.type
record header
attribute in generated records that specifies the CRUD operation associated with each
record. The origin can also add JDBC record header attributes and field attributes to
generated records.
You can use the record:attribute
or
record:attributeOrDefault
functions to access the information
in the attributes. For more information about working with record header attributes,
see Working with Header Attributes.
CRUD Operation Header Attributes
sdc.operation.type
record header attribute. The origin uses the
following values in the attribute to represent the operation type: - 1 for Insert
- 2 for Delete
- 3 for Update
- 4 for Upsert
- 5 for unsupported operations
If you use a CRUD-enabled destination in the pipeline such as JDBC Producer or Elasticsearch, the destination can use the operation type when writing to destination systems. When necessary, you can use an Expression Evaluator processor or any scripting processor to manipulate the value in the header attribute. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.
JDBC Header Attributes
By default, the CONNX CDC origin generates JDBC record header attributes that provide additional information about each record, such as the original data type of a field or the source tables for the record. The origin receives these details from the JDBC driver.
You can use the record:attribute
or
record:attributeOrDefault
functions to access the information
in the attributes. For more information about working with record header attributes,
see Working with Header Attributes.
JDBC header attributes include a user-defined prefix to
differentiate the JDBC header attributes from other record header attributes. By
default, the prefix is jdbc
.
You can change the prefix that the origin uses and you can configure the origin not to create JDBC header attributes with the Create JDBC Header Attributes and JDBC Header Prefix properties on the Advanced tab.
JDBC Header Attribute | Description |
---|---|
<JDBC prefix>.tables |
Provides a
comma-separated list of source tables for the fields in the
record.
Note: Not all JDBC drivers
provide this information.
|
<JDBC prefix>.<column name>.jdbcType | Provides the numeric value of the original SQL data type for each field in the record. See the Java documentation for a list of the data types that correspond to numeric values. |
<JDBC prefix>.<column name>.precision | Provides the original precision for all numeric and decimal fields. |
<JDBC prefix>.<column name>.scale | Provides the original scale for all numeric and decimal fields. |
JDBC Field Attributes
The CONNX CDC origin generates field attributes for columns converted to the Decimal or Datetime data types in Data Collector. The attributes provide additional information about each field.
- Decimal and Numeric data types are converted to the Data Collector Decimal data type, which does not store scale and precision.
- The Timestamp data type is converted to the Data Collector Datetime data type, which does not store nanoseconds.
Data Collector Data Type | Generated Field Attribute | Description |
---|---|---|
Decimal | precision | Provides the original precision for every decimal or numeric column. |
Decimal | scale | Provides the original scale for every decimal or numeric column. |
Datetime | nanoSeconds | Provides the original nanoseconds for every timestamp column. |
You can use the record:fieldAttribute
or
record:fieldAttributeOrDefault
functions to access the information
in the attributes. For more information about working with field attributes, see Field Attributes.
Header Attributes with the Drift Synchronization Solution
When you use the CONNX CDC origin with the Drift Synchronization Solution, ensure that the origin creates JDBC header attributes. JDBC header attributes allow the Hive Metadata processor to use the precision and scale information in the attributes to define decimal fields. The origin creates JDBC header attributes, by default.
- In the origin, on the Advanced tab, make sure that the Create JDBC Header Attributes property is selected.
- On the same tab, you can optionally change the default for the JDBC Header Prefix property.
- If you changed the default value for the JDBC Header Prefix property, then on
the Hive tab of the Hive Metadata processor, configure
the Decimal Precision Expression and Decimal
Scale Expression properties. Update the
jdbc.
string in each property to match the specified JDBC header prefix.If you did not change the JDBC Header Prefix default value, then use the default expressions for the properties.
Event Generation
The CONNX CDC origin can generate events that you can use in an event stream. When you enable event generation, the origin generates events when it completes processing queries, such as insert, delete, or updates queries, and when a query fails to complete. The origin also generates events when creating savepoints and finalize instructions. For more information about savepoints and finalize instructions, see Processing CONNX CDC Data.
- With the Pipeline Finisher executor to stop the pipeline and
transition the pipeline to a Finished state when the origin completes processing
available data.
When you restart a pipeline stopped by the Pipeline Finisher executor, the origin processes data based on how you configured the origin. For example, if you configure the origin to run in incremental mode, the origin saves the offset when the executor stops the pipeline. When it restarts, the origin continues processing from the last-saved offset. In contrast, if you configure the origin to run in full mode, when you restart the pipeline, the origin uses the initial offset, if specified.
For an example, see Stopping a Pipeline After Processing All Available Data.
- With the Email executor to send a custom email
after receiving an event.
For an example, see Sending Email During Pipeline Processing.
-
With a destination to store information about completed queries.
For an example, see Preserving an Audit Trail of Events.
For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.
Event Records
Record Header Attribute | Description |
---|---|
sdc.event.type | Event type. Uses one of the following types:
|
sdc.event.version | Integer that indicates the version of the event record type. |
sdc.event.creation_timestamp | Epoch timestamp when the stage created the event. |
- connx-delete-success
- Generated when the origin completes processing the data returned from a delete query.
- connx-insert-success
- Generated when the origin completes processing the data returned from an insert query.
- connx-update-success
- Generated when the origin completes processing the data returned from an update query.
- connx-finalize-success
- Generated when the origin creates finalize instructions to indicate that it processed all of the data generated by DataSync to reach the latest savepoint.
- connx-savepoint-success
- Generated after the origin successfully creates a savepoint.
- jdbc-query-failure
- Generated when the origin fails to complete processing the data returned from a query.
CONNX SQL Data Types
CONNX SQL Data Type | Data Collector Data Type |
---|---|
BigInt | Long |
Binary | Byte Array |
Bit | Boolean |
Char, Nchar | String |
Date | Date |
Decimal | Decimal |
Double | Double |
Float, QFloat | Float |
Integer | Int |
Numeric | Decimal |
Real | Real |
Smallint, Tinyint | Short |
Time | Time |
Timestamp | Datetime |
Varbinary, Longvarbinary | Byte Array |
Varchar, Longvarchar | String |
Varnchar, Longnvarchar | String |
Unsupported Data Types
- Stops the pipeline
- If the stage encounters an unsupported data type, the stage stops the
pipeline after completing the processing of the previous records and
displays the following
error:
JDBC_37 - Unsupported type 1111 for column.
- Converts to string
- If the stage encounters an unsupported data type, the stage converts the data to string when possible, and then continues processing. Not all unsupported data types can successfully be converted to string. When using this option, verify that the data is converted to string as expected.
Configuring a CONNX CDC Origin
Configure a CONNX CDC origin to process mainframe change data provided by a CONNX DataSync transformation. Before using this origin in a pipeline, perform the prerequisite tasks.