Integrating the gateway with Kafka

The Message Bus Gateway can be configured to integrate with a Kafka server as an event producer.

The Message Bus Gateway connects to the Kafka server using the Kafka transport. The Kafka transport runs on the following:

  • zookeeper-3.4.8.jar
  • kafka-clients-0.10.0.1.jar

Check the Apache Kafka compatibility matrix for the support of the target system with respect to the dependency.

Configuring the Message Bus Gateway to publish Kafka events

The following configuration files are supplied with the gateway for the integration with Kafka:
  • G_XML.props
  • kafkaTransport.properties
  • kafkaConnectionProperties.json
  • kafkaClient.properties

To configure the Message Bus Gateway to publish Kafka events, use the following steps:

  1. Install/update the Message Bus Gateway. See Installing the gateway.
  2. Edit the gateway configuration in the following gateway properties file:

    $OMNIHOME/gates/<arch>/G_XML.props

    Where <arch> is the architecture directory, for example linux2x86

  3. Update the following property values with the appropriate path:
    Gate.XMLGateway.TransportFile		: '$OMNIHOME/java/conf/kafkaTransport.properties'
    Gate.XMLGateway.TransportType		: 'KAFKA'
    
    
  4. Configure the Kafka transport properties.
    1. Edit the Kafka transport configuration in the following transport properties file:

      $OMNIHOME/java/conf/kafkaTransport.properties

    2. Update the following property value with the appropriate path:
      kafkaClientMode=PRODUCER
      connectionPropertiesFile=$OMNIHOME/java/conf/kafkaConnectionProperties.json
      
  5. For descriptions of the Kafka transport properties, see the Kafka transport properties table.
  6. Configure the Kafka connection properties.

    Kafka connection properties are defined in the kafkaConnectionProperties.json file. This file contains the following properties:

    {
        "zookeeper_client" : 
            {
                "target" : "",
                "properties" : "",
                "java_sys_props" : "",
                "topic_watch": true,
                "broker_watch": true
            },
        "brokers" : "",
        "topics": "",
        "kafka_client" : 
            {
                "properties" : "",
                "java_sys_props" : ""
            }
     }
    1. Within the sample configuration file supplied with the gateway, update the path to the ZooKeeper client properties file.
      "zookeeper_client" : 
        {
            "target" : "localhost:2181",
            "properties" : "<Path & File of zookeeper client properties file>",
            "java_sys_props" : "",
            "topic_watch": true,
            "broker_watch": true
        },
    2. Update the path to the Kafka client properties file:
        "brokers" : "PLAINTEXT://localhost:9092",
        "topics": "topicABC,topicXYZ",
        "kafka_client" : 
          {
            "properties" : "<full_omnihome_path>/java/conf/kafkaClient.properties",
            "java_sys_props" : ""	
          }
  7. For descriptions of the Kafka connection properties, see the Kafka connection properties table.
  8. Configure the Kafka client properties.
    1. Edit the Kafka client configuration in the following properties file:

      $OMNIHOME/java/conf/kafkaClient.properties

    2. Uncomment the common Kafka client properties and the producer specific properties in the kafkaClient.properties file.
    3. For details of the configuration required by the connection protocols supported by the Kafka producer, see Kafka configuration for different connection protocols.
    Note: The default setting for the Kafka producer’s key.serializer and value.serializer is org.apache.kafka.common.serialization.StringSerializer. The current Kafka Transport does not support any Kafka producer using a serializer class other than org.apache.kafka.common.serialization.StringSerializer. Specifying any other serializer class causes issues in the producer’s send operation.
  9. Run the gateway using the following command:

    $OMNIHOME/bin/nco_g_xml -propsfile $OMNIHOME/etc/G_XML.props

    .
    Note: The gateway property file must be copied over from $OMNIHOME/gates/xml to $OMNIHOME/etc if it is going to be run from that directory.

Kafka transport properties table

The following table describes the properties used to configure the kafkaTransport.properties file.

Table 1. Kafka transport properties

Property name

Description

kafkaClientMode

Use this property to set the transport as a Kafka client to run as a consumer or a producer.

This property takes the following values:

CONSUMER: A Kafka consumer reads data from topics.

PRODUCER: A Kafka producer writes data to topics.

connectionPropertiesFile

Use this property to specify the JSON file holding the Kafka connection properties.

LivenessCriterion

Use this property to control the periodic Kafka liveness check feature.

This property takes the following values:

TOPIC: Enables liveness check.

NONE: Disables liveness check.

The default value is TOPIC

LivenessCheckInterval

Use this property to configure the interval (in seconds) to run the Kafka liveness check.

The range of valid values is 10 to 1000 seconds.

The default value is 20

LivenessCheckTimeout

Use this property to configure the time (in seconds) the transport allows for the liveness check operation to complete. If the operation does not complete within this period, the transport regards it as a negative Kafka liveness status.

The range of valid values is 5 to 60 seconds.

The default value is 5

LockSendUntilReconnection

Use this property to configure the transport to lock negative liveness status and stop message processing.

This property takes the following values:

TRUE: Locks the negative liveness status after a failed liveness check report.

FALSE: Does not lock the negative liveness status. This used for debugging purposes only.

For details about setting this property, see Kafka liveness test.

The default value is TRUE

Kafka liveness test

In PRODUCER mode, the Kafka transport can be enabled to run the Kafka target liveness test periodically. This test queries the Kafka target for metadata of the topics configured in the kafkaConnectionProperties.json file. The initial test is performed after the worker producer’s initialization as a proof of an established connection to the Kafka target. During start up, the gateway will exit if the initial test fails.

Setting LivenessCriterion to TOPIC enables the periodic liveness check to be performed at the interval specified by the LivenessCheckInterval property. When the Kafka target is unreachable, or the topic does not exist, the query stalls. If the query returns no result within the period specified by the LivenessCheckTimeout property, the transport interrupts the unresponsive query and regards the test as failed.

The Kafka transport creates an adhoc Kafka producer for every liveness check cycle to run the topic metadata query. This is because the KafkaProducer class’s partitionsFor() method is effective only for testing connectivity in the first call of the method which queries the remote system, and then stores the result in the producer instance. A subsequent partitionsFor() call for the same topic will access the local cache instead.

Setting LockSendUntilReconnection to TRUE locks a negative liveness status and stops message processing within the Kafka transport after a liveness check reports a negative status. You would only set the LockSendUntilReconnection property to FALSE for debugging purposes. Do not use this configuration in a production environment.

Note: Locking negative a liveness status prevents the operational problems that could be induced by the gateway not initiating a reconnection in time while the restarted Kafka target may have been using a new IP. If the transport is not renewed, subsequent liveness checks by adhoc Kafka producers (connecting to the target with a new IP) could mistakenly report a positive status contrary to the failed state of the worker producer.

Kafka Reconnection

The Kafka transport does not self-initiate a reconnection. During a phase of negative liveness, the gateway’s send attempts to the transport will hit a TransportSendMsgException. Responding to this exception, the gateway initiates the reconnection in the Kafka transport.

After a successful reconnection, before processing new messages, the Kafka producer will resend the messages that previously failed to reach the target.

Kafka connection properties table

The following table describes the properties used to configure the kafkaConnectionProperties.json file.

Table 2. Kafka connection properties

Property name

Description

zookeeper_client

target

Use this property to specify the ZooKeeper endpoint.

When this property is empty, the transport will not initiate connection to ZooKeeper.

The default is empty.

properties

Use this property to specify the path to a file holding ZooKeeper client properties in key-value format, for example: key=value

The default is empty.

java_sys_props

Use this property to specify the path to a file holding ZooKeeper client Java system properties required in a secure connection.

The default is empty.

topic_watch

Use this property to enable the ZooKeeper topic watch service.Valid values are:

true: Enable the ZooKeeper topic watch service.

false: Disable the ZooKeeper topic watch service.

The default is true.

broker_watch

Use this property to enable the ZooKeeper broker watch service.Valid values are:

true: Enable the ZooKeeper broker watch service.

false: Disable the ZooKeeper broker watch service.

The default is true.

brokers

Use this property to specify broker endpoints in a comma-separated list. For example: “localhost:9092, localhost:9093, localhost:9094”

The brokers must belong to the same cluster managed by a zookeeper.

The default is empty.

topics

Use this property to specify topics in a comma-separated list. For example: “topic1, topic2, topic3”

The default is empty.

Kafka_client

properties

Use this property to specify the path to a file holding Kafka client properties.

The default is empty.

java_sys_props

Use this property to specify the path to a file holding the Kafka client’s Java system properties required in a secure connection.

The default is empty.

Configuring the Kafka compression

Kafka client jar supports three compression codecs: gzip, lz4, and snappy, which can be enabled by setting the value at the Kafka client property compression.type in in the kafkaClient.properties file.

lz4 or snappy compression also requires the following additional setup:

  1. Obtain the lz4 and snappy library jar file from Kafka installation directory: <kafka_installation_location>/libs

    The lz4 jar file is lz4-<version>.jar.

    The snappy jar file is snappy-java-<version>.jar.

  2. Copy the jar files to: $OMNIHOME/java/jars

  3. Add the full path of the jars file to the $CLASSPATH environment variable.

    For example:

    export CLASSPATH=$CLASSPATH:$OMNIHOME/java/jars/lz4-<version>.jar:$OMNIHOME/java/jars/snappy-java-<version>.jar
    Note: Gate.Java.ClassPath is assumed to be containing the $CLASSPATH token.

Enable Kafka logging

For troubleshooting Kafka issues, enable the Kafka client’s Log4j debug logging with the following configuration in the $OMNIHOME/java/conf/log4j.xml file:

<?xml version="1.0" encoding="UTF-8"?>
<!-- Log4j2 Configuration for Kafka Transport -->
<Configuration status="debug">
  <Appenders>
    <File name="LOGFILE" fileName="<log_filename.log>">
      <PatternLayout pattern="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
    </File>
  </Appenders>
  <Loggers>
    <Root level="debug" >
      <!-- Increase the logging level if necessary  -->
      <AppenderRef ref="LOGFILE"/>
    </Root>
  </Loggers>
</Configuration>

Kafka configuration for different connection protocols

Kafka supports three types of connection protocol:

  • SASL_PLAINTEXT
  • SASL_SSL

The following table describes the configuration required by each connection protocol.

Table 3. Connection protocol configuration

Connection protocol

Configuration required

SASL_PLAINTEXT

Kafka producer properties

security.protocol=SASL_PLAINTEXT
Note: Must combine with SASL-specific configurations.

SASL_SSL

Kafka producer properties

acks=all
security.protocol=SASL_SSL

ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2

ssl.truststore.location=<path>\<trust_store_file>
ssl.truststore.password=<trust_store_password>
ssl.truststore.type=JKS

# Specify the SASL type (see table-16)
sasl.mechanism=PLAIN    (SASL: Kafka user access control)
   OR
sasl.mechanism=GSSAPI   (SASL: Kerberos)
   OR
sasl.mechanism=SCRAM    (SASL: SCRAM)

Java system properties

java.security.auth.login.config=<path>/user_jaas.conf
https.protocols=TLSv1.2
Note: Must combine with SASL-specific configurations.

The following table describes SASL-specific configurations.

Table 4. SASL-specific configuration

SASL: Kafka user access control

SASL: Kerberos

Java system properties

java.security.auth.login.config=<path>/user_jaas.conf

Example user_jass.conf

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  serviceName="kafka"
  username="myUserName"
  password="myPasword";
};

Java system properties

java.security.auth.login.config=<path>/user_jass.conf
java.security.krb5.conf=<path>/krb5.conf

Example user_jass.conf

When using IBM JDK

KafkaClient {
   com.ibm.security.auth.module.Krb5LoginModule required
   debug=true
   credsType=both
   useKeytab="<path>/kafka.keytab"
   principal="username/instance@realm";
};

When using Oracle JDK

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   debug=true
   renewTicket=true
   serviceName="kafka"
   useKeyTab=true
   keyTab="<path>/kafka.keytab"
   principal="username/instance@realm";
};
Note: This is the generic format of principal: username/instance@realm. Some organizations might use servicename instead of username or without username
principal="servicename/instance@realm"
principal="instance@realm

Consult your organization administrator for principal information.

SASL: SCRAM

Java system properties

java.security.auth.login.config=<path>/user_jaas.conf

Example user_jaas.conf

KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    tokenauth="true"
    username="<username>"
    password="<password>"
};
Note:

Kafka producer properties are configured in the file specified in the kafka_client.properties field.

Java system properties are configured in the file specified in the kafka_client.java_sys_props field.

Example configuration

kafkaConnectionProperties.json:
"kafka_client": {
    "properties": "/opt/IBM/tivoli/netcool/omnibus/java/conf/kafkaClient.properties",
    "java_sys_props": "/opt/IBM/tivoli/netcool/omnibus/java/conf/kafkaClient_javaSys.properties"
  }
kafkaClient_javaSys.properties:
java.security.auth.login.config=C:\IBM\Tivoli\Netcool\omnibus\java\conf
\kafka_client_jaas.conf
kafka_client_jaas.conf:
KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    serviceName="kafka"
    username="alice"
    password="alice";
};

In broker list configuration, a broker endpoint without a protocol prefix is assumed to be using the protocol configured in the security.protocol property. An unconfigured security.protocol denotes PLAINTEXT.