PostgreSQL CDC Client
The PostgreSQL CDC Client source processes Write-Ahead Logging (WAL) data to generate change data capture records for a PostgreSQL database. To generate change data capture records for an Amazon Aurora PostgreSQL database, use the Aurora PostgreSQL CDC Client source. For information about supported versions, see Supported systems and versions.
You might use this source to perform database replication. You can use a separate flow with the JDBC Query Consumer or JDBC Multitable Consumer source to read existing data. Then start a flow with the PostgreSQL CDC Client source to process subsequent changes.
You configure the contents of the records that the PostgreSQL CDC Client source generates. The source can generate a single record from either each transaction or each operation. The record contents affect the record header attributes, which determine whether additional processing is needed before passing the records to CRUD-enabled targets. For an overview of Data Collector changed data processing and a list of CRUD-enabled targets, see Processing changed data.
Because the source uses the wal2json output plugin for logical decoding, you must configure the wal2json format, which determines how the source reads data. The wal2json format and the record contents affect the memory that the source and database use.
When you configure the PostgreSQL CDC Client, you configure the change capture details, such as the schema and tables to read from, the initial change to use, and the operations to include. You can also use a connection to configure the source.
You define the name for the replication slot to be used, and specify whether to remove replication slots on close. You can also specify the behavior when the source encounters an unsupported data type and include the data for those fields in the record as unparsed strings. When the source database has high-precision timestamps, you can configure the source to write string values rather than datetime values to maintain the precision.
To determine how the source connects to the database, you specify connection information, a query interval, number of retries, and any custom JDBC configuration properties that you need. You can configure advanced connection properties.
You can also configure the source to use a secure connection to the database server using SSL/TLS encryption.
Before you configure the source, you must complete the prerequisites.
PostgreSQL prerequisites
Before you configure the PostgreSQL CDC Client source, complete the following prerequisites in the PostgreSQL database:
- Install the logical decoder.
- Assign the required role.
Install the logical decoder
To enable the PostgreSQL CDC Client source to read Write-Ahead Logging (WAL) changed data capture information, you must install the wal2json logical decoder. Install wal2json on every PostgreSQL instance being monitored for changes.
The wal2json logical decoder is available on GitHub.
Assign the required role
Reading WAL change data capture data from PostgreSQL requires that the user specified in the stage or connection has the replication or superuser role.
You can specify the default user created with the database, named
postgres. Or you can create another database user, grant that user
the replication or superuser role, and then specify that user in the stage or connection
properties.
For details about PostgreSQL roles, see the PostgreSQL documentation.
JDBC driver
When connecting to a PostgreSQL database, you do not need to install a JDBC driver. Data Collector includes the JDBC driver required for PostgreSQL.
Schema, table name, and exclusion patterns
When you configure the PostgreSQL CDC Client source, you specify the tables with the change capture data that you want to process. To specify the tables, you define the schema, a table name pattern, and an optional exclusion pattern.
When defining the schema and table name pattern, you can use SQL LIKE syntax to define a set of tables within a schema or across multiple schemas. For more information about valid patterns for the SQL LIKE syntax, see the PostgreSQL documentation.
When needed, you can also use a regular expression as an exclusion pattern to exclude a subset of tables from the larger set.
- Schema:
sales - Table Name Pattern:
SALES% - Exclusion Pattern:
SALES.*-.
Initial change
The initial change is the point in the Write-Ahead Logging (WAL) data where you want to start processing.
Note that PostgreSQL CDC Client processes only change capture data. If you need existing data, you might use a JDBC Query Consumer or a JDBC Multitable Consumer in a separate flow to read table data before you start a PostgreSQL CDC Client flow.
- From the latest change
- The source processes all changes that occur after you start the flow. If a replication slot exists, the source first processes all existing data in the replication slot.
- From a specified datetime
- The source processes all changes that occurred at the specified datetime and
later. Use the following format:
MM-DD-YYYY HH24:MI:SS. - From a specified log sequence number (LSN)
- The source processes all changes that occurred in the specified LSN and later. When using the specified LSN, the source starts processing with the timestamp associated with the LSN. If the LSN cannot be found in the WAL data, the source continues reading from the next higher LSN that is available.
Example
You want to process all existing data in the Sales schema and then capture changed data, writing all data to Google Bigtable. To do this, you create two flows.
To read the existing data from the schema, you use a flow with the JDBC Multitable Consumer and Google Bigtable target as follows:

Once all existing data is read, you stop the JDBC Multitable Consumer flow and start a PostgreSQL CDC Client flow. Configure the initial change for the PostgreSQL CDC Client source to use the datetime or LSN that picks up after the first flow stops, such as the following:

Memory usage
The format of the data that the source reads and the contents of the generated records affect the memory that the database and source use.
- Wal2json Format
- Sets the format requested from the wal2json plugin. This becomes the format of the data that the source reads. The format affects the memory used. Setting the format to Operation reduces the memory used, but note that this format requires wal2json plugin version 2.0 or later.
- Record Contents
- Sets the contents of the generated record. The source can generate a record for each transaction or a record for each operation. The contents of the records affect the amount of processing required for each batch that the source generates. Setting the contents to Transaction requires more processing and more memory use.
| Wal2json Format | Record Contents | Description |
|---|---|---|
| Operation | Operation | Source reads data presented
as single operations and creates one record for each operation. Data Collector and the database consume the least memory. Recommended configuration. Requires using wal2json plugin version 2.0 or later. |
| Operation | Transaction | Not supported. Do not use. |
| Chunked Transaction | Operation | Source reads data presented
in chunks of transactions and creates one record for each operation. Data Collector and the database consume less memory. Recommended configuration in systems that cannot upgrade to wal2json plugin version 2.0. or later |
| Chunked Transaction | Transaction | Not supported. Do not use. |
| Transaction | Operation | Source reads data presented
as transactions and creates one record for each operation. Data Collector and the database both consume memory. |
| Transaction | Transaction | Source reads data presented
as transactions and creates one record for each transaction, which can contain multiple
operations. Data Collector and the database consume the most memory. This is the default configuration. |
SSL/TLS encryption
You can secure the connection to the PostgreSQL database by configuring the source to use SSL/TLS encryption.
Before configuring the source to use SSL/TLS encryption, verify that the database is correctly configured to use SSL/TLS. For more information, see the PostgreSQL documentation.
Define one of the following SSL/TLS modes that the source uses to connect to PostgreSQL:
- Disabled
- The stage does not establish an SSL/TLS connection.
- Required
- The stage establishes an SSL/TLS connection without any verification. The stage trusts the certificate and host name of the PostgreSQL server.
- Verify CA
- The stage establishes an SSL/TLS connection only after successfully verifying the certificate of the PostgreSQL server.
- Verify Full
- The stage establishes an SSL/TLS connection only after successfully verifying the certificate and host name of the PostgreSQL server.
-----BEGIN CERTIFICATE REQUEST-----
MIIB9TCCAWACAQAwgbgxGTAXBgNVHJoMEFF1b1ZlZGwzIEkpbWl0ZWLxHLAaBgNV
......
98TwDIK/39WEB/V607As+KoYajQL9drorw==
-----END CERTIFICATE REQUEST-----For more information about the SSL/TLS modes available with PostgreSQL, see the PostgreSQL documentation.
Record contents and generated record
- Transaction
- The source generates a record for each transaction, which can
include multiple CRUD operations. Because each record can
contain multiple operations, the source does not write the CRUD
operations to the
sdc.operation.typerecord header attribute.If you want to pass records to a CRUD-enabled target, you might use a scripting processor to convert the records as needed. Or, you might use a Field Pivoter processor and other processors to separate the data to create a record for each operation.
- Operation
- The source generates a record for each operation. Because each
record contains a single operation, the source writes the CRUD
operations to the
sdc.operation.typerecord header attribute. CRUD-enabled targets can easily process generated records.
When you set Record Contents to Operation, generated records include fields that match the column names from the table.
| Field Name | Description |
|---|---|
| xid | Transaction ID. |
| nextlsn | Next Logical Sequence Number (LSN). |
| timestamp | Timestamp with sub-second granularity, including the time zone offset from UTC. |
| change | A list field that includes the following details about each data
change:
|
Record header attributes
The included record header attributes depend on the method that the PostgreSQL CDC Client source uses to generate records.
Transaction Based Records
| CDC Header Attribute | Description |
|---|---|
| postgres.cdc.lsn | Logical Sequence Number of this record. |
| postgres.cdc.xid | Transaction ID. |
| postgres.cdc.timestamp | Timestamp of transaction. |
Operation Based Records
- Change data capture information
-
The source includes the
sdc.operation.typerecord header attribute with the CRUD operation type.The source also includes change data capture information in the following record header attributes:CDC Header Attribute Description postgres.cdc.lsn Logical Sequence Number of this record. postgres.cdc.xid Transaction ID. postgres.cdc.timestamp Timestamp of transaction. postgres.cdc.table Name of table. postgres.cdc.schema Name of schema. postgres.cdc.operation Operation type. - Primary key information
-
The source includes the following record header attribute:
jdbc.primaryKeySpecification- Provides a JSON-formatted string that lists the columns that form the primary key in the table and the metadata for those columns.For example, a table with a composite primary key contains the following attribute:jdbc.primaryKeySpecification = {{"<primary key column 1 name>": {"type": <type>, "datatype": "<data type>", "size": <size>, "precision": <precision>, "scale": <scale>, "signed": <Boolean>, "currency": <Boolean> }}, ..., {"<primary key column N name>": {"type": <type>, "datatype": "<data type>", "size": <size>, "precision": <precision>, "scale": <scale>, "signed": <Boolean>, "currency": <Boolean> } } }A table without a primary key contains the attribute with an empty value:jdbc.primaryKeySpecification = {}
For an update operation on a table with a primary key, the source also includes the following record header attributes:jdbc.primaryKey.before.<primary key column name>- Provides the old value for the specified primary key column.jdbc.primaryKey.after.<primary key column name>- Provides the new value for the specified primary key column.
Note: The source provides the new and old values of the primary key columns regardless of whether the value changes.
Sample records
Suppose the input data has a transaction with three different operations: insert, update, and delete. The generated records and record header attributes differ depending on how you configure the record contents of the PostgreSQL CDC Client source.
Sample transaction based records
Header:
{
"postgres.cdc.timestamp":"2022-08-02 11:56:23.378604+02",
"postgres.cdc.xid":"528",
"postgres.cdc.lsn":"0/167F498"
}
Fields:
{
"xid":528,
"nextlsn":"0/167F6C0",
"timestamp":"2022-08-02 11:56:23.378604+02",
"change":[
{
"kind":"insert",
"schema":"public",
"table":"mytable",
"columnnames":[
"id",
"name"
],
"columntypes":[
"integer",
"character varying(20)"
],
"columnvalues":[
0,
"Jones"
]
},
{
"kind":"update",
"schema":"public",
"table":"mytable",
"columnnames":[
"id",
"name"
],
"columntypes":[
"integer",
"character varying(20)"
],
"columnvalues":[
1,
"Smith"
],
"oldkeys":{
"keynames":[
"id"
],
"keytypes":[
"integer"
],
"keyvalues":[
1
]
}
},
{
"kind":"delete",
"schema":"public",
"table":"mytable",
"oldkeys":{
"keynames":[
"id"
],
"keytypes":[
"integer"
],
"keyvalues":[
2
]
}
}
]
}Sample operation based records
- Record 1:
Header: { "sdc.operation.type":"1", "postgres.cdc.table":"mytable", "postgres.cdc.schema":"public", "postgres.cdc.lsn":"0/167F498", "jdbc.primaryKeySpecification":"{\"id\":{\"type\":4,\"datatype\":\"INTEGER\",\"size\":11,\"precision\":10,\"scale\":0,\"signed\":true,\"currency\":false}}", "postgres.cdc.timestamp":"2022-08-02 11:56:23.378604+02", "postgres.cdc.xid":"528", "postgres.cdc.operation":"I" } Fields: { "name":"Jones", "id":0 } - Record 2:
Header: { "sdc.operation.type":"3", "postgres.cdc.table":"mytable", "postgres.cdc.schema":"public", "postgres.cdc.lsn":"0/167F498", "jdbc.primaryKeySpecification":"{\"id\":{\"type\":4,\"datatype\":\"INTEGER\",\"size\":11,\"precision\":10,\"scale\":0,\"signed\":true,\"currency\":false}}", "postgres.cdc.timestamp":"2022-08-02 11:56:23.378604+02", "jdbc.primaryKey.before.id":"1", "jdbc.primaryKey.after.id":"1", "postgres.cdc.xid":"528", "postgres.cdc.operation":"U" } Fields: { "name":"Smith", "id":1 } - Record 3:
Header: { "sdc.operation.type":"2", "postgres.cdc.table":"mytable", "postgres.cdc.schema":"public", "postgres.cdc.lsn":"0/167F498", "jdbc.primaryKeySpecification":"{\"id\":{\"type\":4,\"datatype\":\"INTEGER\",\"size\":11,\"precision\":10,\"scale\":0,\"signed\":true,\"currency\":false}}", "postgres.cdc.timestamp":"2022-08-02 11:56:23.378604+02", "postgres.cdc.xid":"528", "postgres.cdc.operation":"D" } Fields: { "id":2 }
Configuring a PostgreSQL CDC Client source
About this task
Configure a PostgreSQL CDC Client source to process WAL change data capture data from a PostgreSQL database.
Before you configure the source, complete the prerequisite tasks.