Adding headers to a Kafka producer record
You can configure all integrated Kafka custom operation processors to include one or more headers in every producer record that is sent to Kafka.
The headers with the following data can be added:
- The values of any number of journal control fields.
- The values of any number of source table columns.
- The Avro schema for a Kafka record's key and value.
- The name of a source table.
- The name of a source table schema.
Procedure
Use the header.jcfs property to provide a comma-separated list of journal
control fields that you want to include as headers. One header is added for every journal control
field that you specify. For example:
header.jcfs=ENTTYP,TIMSTAMP,CCID
The key of the header is equal to the name of a journal control field prefixed with the
A_ string. For example, for the ENTTYP journal control field, the
key is equal to A_ENTTYP. The value comes from an after image.
Use HEADER_COLUMNS_ as a prefix to a CDC Replication-generated topic name to
provide a comma-separated list of source table columns whose values you want to include as headers.
One header is added for every source table column that you specify. For example:
HEADER_COLUMNS_instance.source-identifier.sourcedb.schema.table=COLUMN1,COLUMN2
The key of the header is equal to the name of a column. The column names are case sensitive.
If you want to include column values for more than one source table, then specify multiple
HEADER_COLUMNS_* properties. For example:
HEADER_COLUMNS_instance.source-identifier.sourcedb.schema.table1=COLUMN1,COLUMN2
HEADER_COLUMNS_instance.source-identifier.sourcedb.schema.table2=COLUMN3
HEADER_COLUMNS_topic-prefix.schema.table1=COLUMN1,COLUMN2Use the headers.include.schema property to include the Avro schema for a Kafka
record's key and value as headers. Two headers are added. For example:
headers.include.schema=true
The keys for the headers are KeyRecordSchema and
ValueRecordSchema.
Use the headers.include.table.name property to include the name of a source
table as a header. For example:
headers.include.table.name=true
The key for the header is TableName.
Use the headers.include.table.schema property to include the name of a source
table schema as a header. For example:
headers.include.table.schema=true
The key for the header is TableSchema.
Use the headers.include.sourceid property to add
source-identifier as a Kafka header (by default, source-identifier
is the first eight characters of the subscription name and is used in the default name of that Kafka
topic). For example: headers.include.sourceid=true.
Use the header.prefix property to add a prefix to the keys of all the headers
that are generated by an integrated KCOP. For example:
header.prefix=IBM_
- The
KcopSingleRowAvroAuditHeadersIntegratedKCOP is deprecated starting with Version 11.4.0.4-5602, which adds the functionality that is described above for all integrated KCOPs. - If a topic name that is generated by CDC Replication includes non-ASCII
characters, you must escape these characters by using the Unicode escape syntax. For example, for
the topic name
instance.source-identifier.sourcedb.§schema.table, you put the following entry in a KCOP properties file:HEADER_COLUMNS_instance.source-identifier.sourcedb.\u00a7schema.table=COLUMN1,COLUMN2Where
00a7is the Unicode code point of the § character. - All values except the values of columns with binary data types are first converted to strings and then encoded with the UTF-8 encoding.
- The following example shows the sample code that decodes and prints the headers. For the sake of
simplicity, the values of binary columns are printed as
strings.
for (ConsumerRecord<byte[], byte[]> record : records) { StringBuilder headersString =newStringBuilder(); for (Header header : record.headers()) { try { headersString .append( "Key: "+ header.key() +" Value: "+ (header.value() !=null?newString(header.value(), "UTF-8") :"not provided") +"\n"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } System.out.println("\noffset="+ record.offset() +"\nHeader(s) = \n"+ headersString.toString()); }