Adding headers to a Kafka producer record

You can configure all integrated Kafka custom operation processors to include one or more headers in every producer record that is sent to Kafka.

The headers with the following data can be added:

  • The values of any number of journal control fields.
  • The values of any number of source table columns.
  • The Avro schema for a Kafka record's key and value.
  • The name of a source table.
  • The name of a source table schema.

Procedure

Use the header.jcfs property to provide a comma-separated list of journal control fields that you want to include as headers. One header is added for every journal control field that you specify. For example:

header.jcfs=ENTTYP,TIMSTAMP,CCID

The key of the header is equal to the name of a journal control field prefixed with the A_ string. For example, for the ENTTYP journal control field, the key is equal to A_ENTTYP. The value comes from an after image.

Use HEADER_COLUMNS_ as a prefix to a CDC Replication-generated topic name to provide a comma-separated list of source table columns whose values you want to include as headers. One header is added for every source table column that you specify. For example:

HEADER_COLUMNS_instance.source-identifier.sourcedb.schema.table=COLUMN1,COLUMN2

The key of the header is equal to the name of a column. The column names are case sensitive.

If you want to include column values for more than one source table, then specify multiple HEADER_COLUMNS_* properties. For example:

HEADER_COLUMNS_instance.source-identifier.sourcedb.schema.table1=COLUMN1,COLUMN2
HEADER_COLUMNS_instance.source-identifier.sourcedb.schema.table2=COLUMN3
Setting the topic prefix in Management Console by using Kafka properties changes the default topic naming convention. If you set the topic prefix, ensure that you use the following format for HEADER_COLUMNS:
HEADER_COLUMNS_topic-prefix.schema.table1=COLUMN1,COLUMN2

Use the headers.include.schema property to include the Avro schema for a Kafka record's key and value as headers. Two headers are added. For example:

headers.include.schema=true

The keys for the headers are KeyRecordSchema and ValueRecordSchema.

Use the headers.include.table.name property to include the name of a source table as a header. For example:

headers.include.table.name=true

The key for the header is TableName.

Use the headers.include.table.schema property to include the name of a source table schema as a header. For example:

headers.include.table.schema=true

The key for the header is TableSchema.

Use the headers.include.sourceid property to add source-identifier as a Kafka header (by default, source-identifier is the first eight characters of the subscription name and is used in the default name of that Kafka topic). For example: headers.include.sourceid=true.

Use the header.prefix property to add a prefix to the keys of all the headers that are generated by an integrated KCOP. For example:

header.prefix=IBM_
Important:
  • The KcopSingleRowAvroAuditHeadersIntegrated KCOP is deprecated starting with Version 11.4.0.4-5602, which adds the functionality that is described above for all integrated KCOPs.
  • If a topic name that is generated by CDC Replication includes non-ASCII characters, you must escape these characters by using the Unicode escape syntax. For example, for the topic name instance.source-identifier.sourcedb.§schema.table, you put the following entry in a KCOP properties file:
    HEADER_COLUMNS_instance.source-identifier.sourcedb.\u00a7schema.table=COLUMN1,COLUMN2

    Where 00a7 is the Unicode code point of the § character.

  • All values except the values of columns with binary data types are first converted to strings and then encoded with the UTF-8 encoding.
  • The following example shows the sample code that decodes and prints the headers. For the sake of simplicity, the values of binary columns are printed as strings.
    for (ConsumerRecord<byte[], byte[]> record : records)
          {
             StringBuilder headersString =newStringBuilder();
             for (Header header : record.headers())
             {
                try
                {
                   headersString
                      .append(
                         "Key: "+ header.key() +" Value: "+ (header.value() !=null?newString(header.value(), "UTF-8") :"not provided")
                            +"\n");
                }
                catch (UnsupportedEncodingException e)
                {
                   e.printStackTrace();
                }
             }
             System.out.println("\noffset="+ record.offset() +"\nHeader(s) = \n"+ headersString.toString());
          }