Integrating the gateway with Kafka
The Message Bus Gateway can be configured to integrate with a Kafka server as an event producer.
The Message Bus Gateway connects to the Kafka server using the Kafka transport. The Kafka transport runs on the following:
- zookeeper-3.4.8.jar
- kafka-clients-0.10.0.1.jar
Check the Apache Kafka compatibility matrix for the support of the target system with respect to the dependency.
Configuring the Message Bus Gateway to publish Kafka events
- G_XML.props
- kafkaTransport.properties
- kafkaConnectionProperties.json
- kafkaClient.properties
To configure the Message Bus Gateway to publish Kafka events, use the following steps:
- Install/update the Message Bus Gateway. See Installing the gateway.
- Edit the gateway configuration in the following gateway properties
file:
$OMNIHOME/gates/<arch>/G_XML.props
Where <arch> is the architecture directory, for example linux2x86
- Update the following property values with the appropriate path:
Gate.XMLGateway.TransportFile : '$OMNIHOME/java/conf/kafkaTransport.properties' Gate.XMLGateway.TransportType : 'KAFKA' - Configure the Kafka transport properties.
- Edit the Kafka transport configuration in the following transport
properties file:
$OMNIHOME/java/conf/kafkaTransport.properties
- Update the following property value with the appropriate path:
kafkaClientMode=PRODUCER connectionPropertiesFile=$OMNIHOME/java/conf/kafkaConnectionProperties.json
- Edit the Kafka transport configuration in the following transport
properties file:
- For descriptions of the Kafka transport properties, see the Kafka transport properties table.
- Configure the Kafka connection properties.
Kafka connection properties are defined in the kafkaConnectionProperties.json file. This file contains the following properties:
{ "zookeeper_client" : { "target" : "", "properties" : "", "java_sys_props" : "", "topic_watch": true, "broker_watch": true }, "brokers" : "", "topics": "", "kafka_client" : { "properties" : "", "java_sys_props" : "" } }- Within the sample configuration file supplied with the gateway, update the path to the ZooKeeper
client properties
file.
"zookeeper_client" : { "target" : "localhost:2181", "properties" : "<Path & File of zookeeper client properties file>", "java_sys_props" : "", "topic_watch": true, "broker_watch": true }, - Update the path to the Kafka client properties
file:
"brokers" : "PLAINTEXT://localhost:9092", "topics": "topicABC,topicXYZ", "kafka_client" : { "properties" : "<full_omnihome_path>/java/conf/kafkaClient.properties", "java_sys_props" : "" }
- Within the sample configuration file supplied with the gateway, update the path to the ZooKeeper
client properties
file.
- For descriptions of the Kafka connection properties, see the Kafka connection properties table.
- Configure the Kafka client properties.
- Edit the Kafka client configuration in the following properties
file:
$OMNIHOME/java/conf/kafkaClient.properties
- Uncomment the common Kafka client properties and the producer specific properties in the kafkaClient.properties file.
- For details of the configuration required by the connection protocols supported by the Kafka producer, see Kafka configuration for different connection protocols.
Note: The default setting for the Kafka producer’s key.serializer and value.serializer isorg.apache.kafka.common.serialization.StringSerializer. The current Kafka Transport does not support any Kafka producer using a serializer class other thanorg.apache.kafka.common.serialization.StringSerializer. Specifying any other serializer class causes issues in the producer’s send operation. - Edit the Kafka client configuration in the following properties
file:
- Run the gateway using the following command:
.$OMNIHOME/bin/nco_g_xml -propsfile $OMNIHOME/etc/G_XML.propsNote: The gateway property file must be copied over from $OMNIHOME/gates/xml to $OMNIHOME/etc if it is going to be run from that directory.
Kafka transport properties table
The following table describes the properties used to configure the kafkaTransport.properties file.
Property name |
Description |
|---|---|
kafkaClientMode |
Use this property to set the transport as a Kafka client to run as a consumer or a producer. This property takes the following values: CONSUMER: A Kafka consumer reads data from topics. PRODUCER: A Kafka producer writes data to topics. |
connectionPropertiesFile |
Use this property to specify the JSON file holding the Kafka connection properties. |
|
LivenessCriterion |
Use this property to control the periodic Kafka liveness check feature. This property takes the following values:
The default value is |
|
LivenessCheckInterval |
Use this property to configure the interval (in seconds) to run the Kafka liveness check. The range of valid values is 10 to 1000 seconds. The default value is |
|
LivenessCheckTimeout |
Use this property to configure the time (in seconds) the transport allows for the liveness check operation to complete. If the operation does not complete within this period, the transport regards it as a negative Kafka liveness status. The range of valid values is 5 to 60 seconds. The default value is |
|
LockSendUntilReconnection |
Use this property to configure the transport to lock negative liveness status and stop message processing. This property takes the following values:
For details about setting this property, see Kafka liveness test. The default value is |
Kafka liveness test
In PRODUCER mode, the Kafka transport can be enabled to run the Kafka target liveness test periodically. This test queries the Kafka target for metadata of the topics configured in the kafkaConnectionProperties.json file. The initial test is performed after the worker producer’s initialization as a proof of an established connection to the Kafka target. During start up, the gateway will exit if the initial test fails.
Setting LivenessCriterion to TOPIC enables the periodic
liveness check to be performed at the interval specified by the
LivenessCheckInterval property. When the Kafka target is unreachable, or the
topic does not exist, the query stalls. If the query returns no result within the period specified
by the LivenessCheckTimeout property, the transport interrupts the unresponsive
query and regards the test as failed.
The Kafka transport creates an adhoc Kafka producer for every liveness check cycle to run the
topic metadata query. This is because the KafkaProducer class’s
partitionsFor() method is effective only for testing connectivity in the first call
of the method which queries the remote system, and then stores the result in the producer instance.
A subsequent partitionsFor() call for the same topic will access the local cache
instead.
Setting LockSendUntilReconnection to TRUE locks a negative
liveness status and stops message processing within the Kafka transport after a liveness check
reports a negative status. You would only set the LockSendUntilReconnection
property to FALSE for debugging purposes. Do not use this configuration in a
production environment.
Kafka Reconnection
The Kafka transport does not self-initiate a reconnection. During a phase of negative liveness,
the gateway’s send attempts to the transport will hit a TransportSendMsgException.
Responding to this exception, the gateway initiates the reconnection in the Kafka transport.
After a successful reconnection, before processing new messages, the Kafka producer will resend the messages that previously failed to reach the target.
Kafka connection properties table
The following table describes the properties used to configure the kafkaConnectionProperties.json file.
|
Property name |
Description |
|
|---|---|---|
|
zookeeper_client |
target |
Use this property to specify the ZooKeeper endpoint. When this property is empty, the transport will not initiate connection to ZooKeeper. The default is empty. |
|
properties |
Use this property to specify the path to a file holding ZooKeeper client properties in key-value
format, for example: The default is empty. |
|
|
java_sys_props |
Use this property to specify the path to a file holding ZooKeeper client Java system properties required in a secure connection. The default is empty. |
|
|
topic_watch |
Use this property to enable the ZooKeeper topic watch service.Valid values are: true: Enable the ZooKeeper topic watch service. false: Disable the ZooKeeper topic watch service. The default is true. |
|
|
broker_watch |
Use this property to enable the ZooKeeper broker watch service.Valid values are: true: Enable the ZooKeeper broker watch service. false: Disable the ZooKeeper broker watch service. The default is true. |
|
|
brokers |
Use this property to specify broker endpoints in a comma-separated list. For example:
The brokers must belong to the same cluster managed by a zookeeper. The default is empty. |
|
|
topics |
Use this property to specify topics in a comma-separated list. For example:
The default is empty. |
|
|
Kafka_client |
properties |
Use this property to specify the path to a file holding Kafka client properties. The default is empty. |
|
java_sys_props |
Use this property to specify the path to a file holding the Kafka client’s Java system properties required in a secure connection. The default is empty. |
|
Configuring the Kafka compression
Kafka client jar supports three compression codecs: gzip, lz4,
and snappy, which can be enabled by setting the value at the Kafka client property
compression.type in in the kafkaClient.properties
file.
lz4 or snappy compression also requires the following
additional setup:
-
Obtain the
lz4andsnappylibrary jar file from Kafka installation directory: <kafka_installation_location>/libsThe
lz4jar file is lz4-<version>.jar.The
snappyjar file is snappy-java-<version>.jar. -
Copy the jar files to: $OMNIHOME/java/jars
-
Add the full path of the jars file to the
$CLASSPATHenvironment variable.For example:
export CLASSPATH=$CLASSPATH:$OMNIHOME/java/jars/lz4-<version>.jar:$OMNIHOME/java/jars/snappy-java-<version>.jar
Note: Gate.Java.ClassPath is assumed to be containing the$CLASSPATHtoken.
Enable Kafka logging
For troubleshooting Kafka issues, enable the Kafka client’s Log4j debug logging with the following configuration in the $OMNIHOME/java/conf/log4j.xml file:
<?xml version="1.0" encoding="UTF-8"?>
<!-- Log4j2 Configuration for Kafka Transport -->
<Configuration status="debug">
<Appenders>
<File name="LOGFILE" fileName="<log_filename.log>">
<PatternLayout pattern="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
</File>
</Appenders>
<Loggers>
<Root level="debug" >
<!-- Increase the logging level if necessary -->
<AppenderRef ref="LOGFILE"/>
</Root>
</Loggers>
</Configuration>
Kafka configuration for different connection protocols
Kafka supports three types of connection protocol:
SASL_PLAINTEXTSASL_SSL
The following table describes the configuration required by each connection protocol.
|
Connection protocol |
Configuration required |
|---|---|
|
|
Kafka producer properties
Note: Must combine with SASL-specific configurations.
|
|
|
Kafka producer properties
Java system properties
Note: Must combine with SASL-specific configurations.
|
The following table describes SASL-specific configurations.
|
SASL: Kafka user access control |
SASL: Kerberos |
|---|---|
|
Java system properties
Example user_jass.conf
|
Java system properties
Example user_jass.conf When using IBM JDK
When using Oracle JDK
Note: This is the generic format of principal:
username/instance@realm. Some
organizations might use servicename instead of
username or without username
Consult your organization administrator for principal information. |
|
SASL: SCRAM |
|
|
Java system properties
Example user_jaas.conf
|
|
Kafka producer properties are configured in the file specified in the kafka_client.properties field.
Java system properties are configured in the file specified in the kafka_client.java_sys_props field.
Example configuration
"kafka_client": {
"properties": "/opt/IBM/tivoli/netcool/omnibus/java/conf/kafkaClient.properties",
"java_sys_props": "/opt/IBM/tivoli/netcool/omnibus/java/conf/kafkaClient_javaSys.properties"
}java.security.auth.login.config=C:\IBM\Tivoli\Netcool\omnibus\java\conf
\kafka_client_jaas.confKafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="kafka"
username="alice"
password="alice";
};In
broker list configuration, a broker endpoint without a protocol prefix
is assumed to be using the protocol configured in the security.protocol property.
An unconfigured security.protocol denotes PLAINTEXT.