Enabling a KCOP to write audit records in Avro format

The KcopMultiRowAvroLiveAuditIntegrated Kafka custom operation processor can write audit records in Avro format and register the schema in a Confluent schema registry or a Hortonworks Schema Registry Service.

About this task

You can use this KCOP to write audit records to Kafka as Avro generic records. Before images and after images are separate records. This KCOP registers the schema in a schema registry. You can optionally provide a properties file where you can specify which journal control fields to include in the audit record, as well as the schema registry URL. For full details, see API reference in Javadoc format.

Procedure

  1. In Management Console, click Configuration > Subscriptions.
  2. Select the subscription.
  3. Right-click the subscription and select Kafka Properties.
  4. Verify that Zookeeper is selected as the method for Kafka apply.
  5. Click OK.
  6. Right-click the subscription and select User Exit.
  7. Enter values for the following fields:
    Class Name
    com.datamirror.ts.target.publication.userexit.sample.kafka.KcopMultiRowAvroLiveAuditIntegrated
    Parameters
    You can specify a URL or a properties file name and path:
    URL
    protocol://schema registry host name:port[/version]

    The version is optional. For example, for replication to Confluent Platform, http://9.12.219.2141:8081 is sufficient, but the Hortonworks schema registry sometimes includes the version number, for example http://9.12.219.2141:8081/api/v1.

    Properties file name and path
    -file:full path to properties file name

    You might want to use the convention CDC_Kafka_installation_directory/conf/filename.properties

    The following example shows the contents of a properties file:

    schema.registry.url=http://9.12.219.2141:8081
    audit.jcfs=ENTTYP,CCID
    before.update.record.mode=ALWAYS
    default.null.JCF.value=""
    null.UB.ENTTYP.override="UB"
  8. Click OK.

    Sample output

    $ db2 "insert into tab1 values (8,8,8, 'Tab1 data')"
    DB20000I The SQL command completed successfully.
    $ db2 "update tab1 set I1 = 9, I2 = 9 where I1 = 8"
    DB20000I The SQL command completed successfully.
    $ db2 "delete from tab1 where I1 = 9"
    DB20000I The SQL command completed successfully.
    

    Use the Confluent Kafka provided consumer to read the records

    kafka-avro-console-consumer --zookeeper localHost:2181 --property print.key=true --topic topic_name --from-beginning
    
    {"I2":8,"I3":8} 
    {"I1":8,"I2":8,"I3":8,"V1":{"string":"Tab1 data"},"A_ENTTYP":"PT","A_TIMSTAMP":"2017-11-24 10:49:53.000000000000","A_USER":"USER ","A_JOBUSER":"USER "}
    
    {"I2":8,"I3":8}
     {"I1":8,"I2":8,"I3":8,"V1":{"string":"Tab1 data"},"A_ENTTYP":"UB","A_TIMSTAMP":"2017-11-24 10:49:57.000000000000","A_USER":"USER ","A_JOBUSER":"USER "}
    
    {"I2":9,"I3":8}
     {"I1":9,"I2":9,"I3":8,"V1":{"string":"Tab1 data"},"A_ENTTYP":"UP","A_TIMSTAMP":"2017-11-24 10:49:57.000000000000","A_USER":"USER ","A_JOBUSER":"USER "}
    
    {"I2":9,"I3":8}
     {"I1":9,"I2":9,"I3":8,"V1":{"string":"Tab1 data"},"A_ENTTYP":"DL","A_TIMSTAMP":"2017-11-24 10:50:01.000000000000","A_USER":"USER ","A_JOBUSER":"USER "}