Processing changed data

Certain stages enable you to easily process data changes, such as change capture data (CDC) or transactional data, in a flow.

CDC-enabled sources can read change capture data. Some exclusively read change capture data, others can be configured to read it. When reading changed data, they determine the CRUD operation associated with the data and include CRUD operations - such as insert, update, upsert, or delete - in the sdc.operation.type record header attribute.

CRUD-enabled processors and targets can use the CRUD operation type in the sdc.operation.type header attribute when writing records, enabling the external system to perform the appropriate operation.

Using a CDC-enabled source and CRUD-enabled stages in a flow allows you to easily write changed data from one system into another. You can also use a CDC-enabled source to write to non-CRUD targets, and non-CDC sources to write to CRUD-enabled stages. For information on how that works, see Use cases.

CRUD operation header attribute

CDC-enabled sources read include the sdc.operation.type record header attribute in all records when reading changed data.

CRUD-enabled processors and targets can use the CRUD operation type in the sdc.operation.type header attribute when writing records, enabling the external system to perform the appropriate operation.

The sdc.operation.type record header attribute uses the following integers to represent CRUD operations:
  • 1 for INSERT records
  • 2 for DELETE records
  • 3 for UPDATE records
  • 4 for UPSERT records
  • 5 for unsupported operations or codes
  • 6 for UNDELETE records
  • 7 for REPLACE records
  • 8 for MERGE records
Note: Some sources use only a subset of the operations, based on the operations supported by the source system. Similarly, targets recognize only the subset of the operations that the target systems support. See the source and target documentation for details about supported operations.

Earlier implementations

Some sources were enabled for CDC using different record header attributes in earlier releases, but they all now include the sdc.operation.type record header attribute. All earlier CRUD header attributes are retained for backward compatibility.

Similarly, CRUD-enabled targets that were enabled to look for the CRUD operation type in other header attributes can now look for the sdc.operation.type record header attribute first and check the alternate attribute afterwards. The alternate header attribute functionality is retained for backward compatibility.

CDC-enabled stages

CDC-enabled stages provide the CRUD operation type in the sdc.operation.type record header attribute. Some sources provide alternate and additional header attributes.

The following stages provide CRUD record header attributes:
CDC-Enabled Stage CRUD Record Header Attributes
Aurora PostgreSQL CDC Client Either includes the CRUD operation type in the record or includes the operation type in two record header attributes:
  • sdc.operation.type
  • postgres.cdc.operation

Includes additional CDC information in record header attributes with the postgres.cdc prefix, such as postgres.cdc.lsn.

For more information, see Record contents and generated records.

MongoDB Atlas CDC Includes the CRUD operation type in the sdc.operation.type record header attribute.

Can include additional CDC information in record header attributes, such as the op and ns attributes.

For more information, see Generated records.

MySQL Binary Log Includes the CRUD operation type in the sdc.operation.type record header attribute.

Includes additional CDC information in record fields.

For more information, see Generated records.

Oracle CDC Includes the CRUD operation type in both of the following headers:
  • sdc.operation.type
  • oracle.cdc.operation

For more information, see CRUD operation header attributes.

Includes additional CDC information in record header attributes with the oracle.cdc prefix, such as oracle.cdc.sequence.oracle.

Oracle CDC Client Includes the CRUD operation type in both of the following headers:
  • sdc.operation.type
  • oracle.cdc.operation

For more information, see CRUD operation header attributes.

Includes additional CDC information in record header attributes with the oracle.cdc prefix, such as oracle.cdc.table.

Oracle Multitable Consumer Includes the CRUD operation type in both of the following headers:
  • sdc.operation.type
  • oracle.cdc.operation

For more information, see CRUD operation header attributes.

Oracle XStream Includes the CRUD operation type in both of the following headers:
  • sdc.operation.type
  • oracle.cdc.operation

For more information, see CRUD operation header attributes.

PostgreSQL CDC Client Either includes the CRUD operation type in the record or includes the operation type in two record header attributes:
  • sdc.operation.type
  • postgres.cdc.operation

Includes additional CDC information in record header attributes with the postgres.cdc prefix, such as postgres.cdc.lsn.

For more information, see Record contents and generated record

Salesforce Includes the CRUD operation type in the sdc.operation.type record header attribute.

For more information, see CRUD operation header attribute.

SQL Parser Includes the CRUD operation type in both of the following headers:
  • sdc.operation.type
  • oracle.cdc.operation

For more information, see Generated Records.

SQL Server CDC Client Includes the CRUD operation type in the sdc.operation.type record header attribute.

Includes CDC information in header attributes named jdbc.<CDC column name>. For more information, see Record header attributes.

SQL Server Change Tracking Includes the CRUD operation type in the sdc.operation.type record header attribute.

Includes additional information from change tables in the jdbc.SYS_CHANGE header attributes. For more information, see Record header attributes.

Data changes in SQL Server or Azure SQL Database

SQL Server and Azure SQL Database offer multiple methods to track data changes. In Data Collector, the appropriate source depends on the method that the database uses to track changes, as shown in the following table:
Method Used to Track Changes Data Collector Source
CDC tables SQL Server CDC Client

For more information about CDC tables, see the Microsoft documentation.

Change tracking tables SQL Server Change Tracking

For more information about change tracking tables, see the Microsoft documentation.

Temporal tables JDBC Multitable Consumer or JDBC Query Consumer

For more information about temporal tables, see the Microsoft documentation.

CRUD-enabled stages

The following stages recognize CRUD operations stored in record header attributes and can perform writes based on those values. Some stages also provide CRUD-related properties.
CRUD-Enabled Stage Supported Operations Stage Processing
JDBC Tee processor
  • INSERT
  • UPDATE
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

A Change Log property enables processing records based on the CDC-enabled source in the flow.

For more information, see CRUD operation processing.

Azure Synapse SQL target
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute

For more information, see CRUD operation processing.

Couchbase target
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Write Operation and Unsupported Operation Handling properties in the stage

For more information, see CRUD operation processing.

Databricks target
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute

For more information, see CRUD operation processing.

Elasticsearch target
  • CREATE (INSERT)
  • UPDATE
  • INDEX (UPSERT)
  • DELETE
  • UPDATE with doc_as_upsert (MERGE)
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

For more information, see CRUD operation processing.

Google BigQuery target
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute

For more information, see CRUD operation processing.

JDBC Producer target
  • INSERT
  • UPDATE
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

A Change Log Format property enables processing records based on the CDC-enabled source in the flow.

For more information, see CRUD operation processing.

MongoDB Atlas target
  • INSERT
  • UPDATE
  • REPLACE
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Upsert property in the stage

For more information, see Define the CRUD operation.

Oracle target
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute

For more information, see CRUD operation processing.

Redis target
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute

For more information, see Define the CRUD operation.

Salesforce target
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
  • UNDELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

For more information, see CRUD operation processing.

Salesforce Bulk API 2.0 target
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

For more information, see CRUD operation processing.

SingleStore target
  • INSERT
  • UPDATE
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

A Change Log Format property enables processing records based on the CDC-enabled source in the flow.

For more information, see CRUD operation processing.

Snowflake target
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute

For more information, see CRUD operation processing.

Teradata target
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute

For more information, see CRUD operation processing.

Processing the record

Change logs can provide record data in different formats. The JDBC Tee processor and JDBC Producer target can decode most change log formats to generate record data based on the source change log. When using other CRUD-enabled targets, you might need to add additional processing to the flow to alter the format of the record.

For example, Microsoft SQL CDC records created by the JDBC Query Consumer source contains CDC fields in the record, in addition to record data. You might use a Field Remover processor to drop any unnecessary fields from the record.

In contrast, the MySQL Server binary logs read by the My SQL Binary Log source provides new or updated data in a New Data map field and changed or deleted data in a Changed Data map field. You might want to use the Field Flattener processor to flatten the map field with the data that you need, and a Field Remover processor to remove any unnecessary fields.

For details on the format of generated records, see the documentation for the CDC-enabled source.

Use cases

You can use CDC-enabled sources and CRUD-enabled targets in flows together or individually. Here are some typical use cases:
CDC-enabled source with CRUD-enabled targets
You can use a CDC-enabled source and a CRUD-enabled target to easily process changed records and write them to a target system.

For example, say you want to write CDC data from Microsoft SQL Server to Elasticsearch. To do this, you use the CDC-enabled SQL Server CDC Client source to read data from a Microsoft SQL Server change capture table. The source places the CRUD operation type in the sdc.operation.type header attribute, in this case: 1 for INSERT, 2 for DELETE, 3 for UPDATE.

You configure the flow to write to the CRUD-enabled Elasticsearch target. In the Elasticsearch target, you can specify a default operation for any record with no value set in the sdc.operation.type attribute, and you can configure error handling for invalid values. You set the default to INSERT and you configure the target to use this default for invalid values. In the sdc.operation.type attribute, the Elasticsearch target supports 1 for INSERT, 2 for DELETE, 3 for UPDATE, and 4 for UPSERT.

When you run the flow, the SQL Server CDC Client source determines the CRUD operation type for each record and writes it to the sdc.operation.type record header attribute. And the Elasticsearch target uses the operation in the sdc.operation.type attribute to inform the Elasticsearch target system how to process each record. Any record with an undeclared value in the sdc.operation.type attribute, such as a record created by the flow, is treated like an INSERT record. And any record with an invalid value uses the same default behavior.

CDC-enabled source to non-CRUD targets

If you need to write changed data to a target system without a CRUD-enabled target, you can use an Expression Evaluator processor or any scripting processor to move the CRUD operation information from the sdc.operation.type header attribute to a field, so the information is retained in the record.

For example, say you want to read from Oracle LogMiner redo logs and write the records to Google Cloud Storage with all of the CDC information in record fields. To do this, you'd use the Oracle CDC Client source to read the redo logs, then add an Expression Evaluator to pull the CRUD information from the sdc.operation.type header attribute into the record. Oracle CDC Client writes additional CDC information, such as the table name and SCN, into oracle.cdc header attributes, so you can use expressions to pull that information into the record as well. Then you can use the Google Cloud Storage target to write the enhanced records to Google Cloud Storage.

Non-CDC source to CRUD targets
When reading data from a non-CDC source, you can use an Expression Evaluator processor or any scripting processor to define the sdc.operation.type header attribute. e
For example, say you want to read from a transactional database table and keep a dimension table in sync with the changes. You'd use the JDBC Query Consumer to read the source table and a JDBC Lookup processor to check the dimension table for the primary key value of each record. Then, based on the output of the lookup processor, you know if there was a matching record in the table or not. Using an Expression Evaluator, you set the sdc.operation.type record header attribute - 3 to update the records that had a matching record, and 1 to insert new records.
When you pass the records to the JDBC Producer target, the target uses the operation in the sdc.operation.type header attribute to determine how to write the records to the dimension table.