Apache Kafka connector (DataStage)

Use the Apache Kafka connector in DataStage® to write and to read streams of events from and into topics.

Prerequisite

Create the connection. For instructions, see Connecting to a data source in DataStage and the Apache Kafka connection.

Reading data from the topics in Kafka

You can configure the Apache Kafka connector to read messages from a Kafka data source.

Figure 1. Example of reading data from the Apache Kafka connector
Example of reading data from the Apache Kafka connector
Configuring the Apache Kafka connector as a source
  1. From the job design canvas, double-click the Apache Kafka connector.
  2. On the Stage tab, click Properties.
  3. Select Use DataStage properties.
  4. Enter the values for the required properties:
    • Topic name - Kafka topic to read messages from.
    • Key serializer - The serializer for the key data types in Apache Kafka. The Key serializer value must be compatible with the key column (if defined) in the Output link. If you select either an Avro or an Avro as JSON key serializer, the Value serializer must be set to either Avro or Avro as JSON.
    • Value serializer - The serializer for the value data types in Apache Kafka. The Value serializer value must be compatible with the key column (if defined) in the Output link.
    • Consumer group - A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group ID for multiple processes, you indicate that they are all part of the same consumer group.
    • Max poll records - The maximum number of records that are returned in a single call to poll.
    • Max messages - The maximum number of messages to consume or produce from or to the topic on a player-process basis. This value is a multiple of the Max poll records. This setting is not active if you select Continuous mode.
    • Reset policy - The value that indicates the policy if there is no initial offset in the Kafka server or if the current offset does not exist anymore on the server. For example, if the data is deleted, you can use the values earliest and latest to automatically reset the offset to the earliest of the latest offset.
  5. Configure optional properties:
    • Continuous mode - The connector will fetch messages in a continuous mode by waiting for new incoming messages indefinitely.
      • Stop message - The message that stops the continuous-mode processing.
    • Transaction - The transaction that sends the "End of wave" marker settings:
      • Record count - The number of records per transaction. The value 0 means all available records.
      • Time interval - The time interval for a transaction. Use this property if you set the Record count to 0.
    • End of wave - Send an "End of wave" marker after each transaction.
    • End of data - Insert an "End of wave" marker for the final set of records when their number is less than the value specified for the transaction record count. If the transaction Record count value is 0 (all available records), there is only one transaction wave. In this case, set the End of data value to Yes so that the "End of wave" marker will be inserted for that transaction wave.
    • Logging level - Configures the Kafka client logging. See Controlling log output.
    • Client configuration properties - Sets additional Kafka client properties. See Kafka client configuration properties
  6. Click the Output tab to define the columns for the connector. The columns must be compatible with the data type that is defined in the Key serializer and Value serializer properties. If you use an Avro or an Avro as JSON serializer, the column names must match the Avro record fields. Refer to Schema registry for details.
    • value - The column that contains the Kafka message values. It must be compatible with the Value serializer property.
    • key - The column that contains the Kafka message keys. It must be compatible with the Key serializer property.
    • partition - The column that contains the partition number that the Kafka message comes from. It must be compatible with Integer data type.
    • timestamp - The column that contains the timestamp for the Kafka message. It must be compatible with the Timestamp data type.
    • offset - The column that contains the offset for the Kafka message. It must be compatible with the BigInt data type.

Writing data to topics in Kafka

You can configure the Apache Kafka connector to write data to a Kafka data source.

Figure 2. Example of writing data to the Kafka data source
Example of writing data to the Kafka data source
Configuring the Apache Kafka connector as a target
  1. From the job design canvas, double-click the Apache Kafka connector.
  2. On the Stage tab, click Properties.
  3. Select Use DataStage properties.
  4. Enter the values for the required properties:
    • Topic name - The name of the topic into which the messages are to be written from the upstream stage.
    • Key serializer - The serializer for the key data types in Apache Kafka. The Key serializer value must be compatible with the key column (if defined) in the Input link. If you select either an Avro or an Avro as JSON key serializer, the Value serializer must be set to either Avro or to Avro as JSON.
    • Value serializer - The serializer for the value data types in Apache Kafka. The Value serializer value must be compatible with the key column (if defined) in the Input link.
  5. Configure optional properties:
  6. Click the Input tab to define the columns for the connector. The columns must be compatible with data type that is defined in the Key serializer and Value serializer properties. If you use an Avro or an Avro as JSON serializer, the column names must match the Avro record fields. Refer to Schema registry for details.
    • value - The column that contains the Kafka message values. It must be compatible with the Value serializer property.
    • key - The column that contains the Kafka message keys. It must be compatible with the Key serializer property.

Schema registry

The schema registry gives you the ability to use more complex Kafka messages. You can use the schema registry for general DataStage column definitions within job designs.

By default, messages are returned as a value within a single column that you define in the Apache Kafka connector. When you use the Avro and Avro as JSON formats and the schema registry, you enable decomposition of complex Kafka messages into the DataStage columns.

When you write to a topic, the connector creates a schema that is based on the column definitions.

When the connector reads from a Kafka topic, the connector takes a schema from the schema registry. Based on that schema, the Avro record fields are extracted into columns that are defined as connector output.

For example, when the Avro schema is defined as the following:
{
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "salary",
      "type": "double"
    }
  ],
  "name": "KafkaConnectorSchema",
  "type": "record"
}
In the Apache Kafka connector, define the columns as follows:
Table 1. Column name and data type
Column name Data type
id Integer
name VarChar
salary Double

To use the schema registry, use the following configuration:

  • You must set either the Key serializer or the Value serializer configuration property to Avro or to Avro as JSON.
  • After you set the Key serializer to Avro or Avro as JSON, you cannot configure the Value serializer to a value other than Avro or Avro as JSON.
  • When you set the Key serializer configuration property to Avro, the column definition for the link must conform to the Avro schema definition.
  • Key columns must either be selected with the Key checkbox or their names must start with a key_ prefix. Use only one method. You cannot use a Key checkbox for one column and a key_ prefix for another column.

Kafka client configuration properties

Use the Client configuration properties (under Advanced Kafka configuration on the Stage tab) when you need to set a Kafka client configuration property, but the Apache Kafka connector does not have a configuration dialog for it.

Enter the property with the format key=value with entries defined on each line. Use the # character for comments. For a description of the rules for the client configuration properties, refer to the Java Properties definition.

Troubleshooting

Controlling log output

Kafka client logging

Use the properties in the Advanced Kafka configuration to control the Kafka client logging. The Kafka client log entries will be written to the DataStage job log along with the log entries for the stages' output.

  • Logging level: Set the minimum level of messages produced by the Kafka client to write to the job log.
  • Warning and error log entries: Change the value to Log as informational or to Log as Warning to control whether any ERROR or FATAL messages will be written to the job log with a severity that might cause a job failure.

To view Debug or Trace messages, set the associated logging level in the CC_MSG_LEVEL environment variable.

Kerberos authentication debugging information
To enable additional debug information when the Apache Kafka connector uses Kerberos authentication, set the following JVM properties in the environment variable CC_JVM_OPTIONS and to the job:
-Dcom.ibm.security.jgss.debug=all
-Dcom.ibm.security.krb5.Krb5Debug=all

Error messages

Table 2. Troubleshooting errors
Error type Error details Solution
KafkaProducer - Failed to update metadata after 20000 ms The Kafka client does not report the connection problem when there is a misconfiguration between authentication set on Kafka broker side and within the Apache Kafka connector. For example, the Apache Kafka connector is configured to connect without any security settings, but the connection is made on the port that is secured with SSL. When the Apache Kafka connector is configured to read messages from Kafka, the job will stop responding at the connection step and the Kafka client will not report an error. You must stop the job manually. Ensure that the connection settings (hostname, port, secure connection type, and properties) match Kafka broker connection setup.
Commit cannot be completed When the Apache Kafka connector is configured to run in Continuous mode, the job might fail with this message:

Kafka Connector was unable to commit messages due to following error: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
In the stage Advanced section, change the Execution mode to Sequential.
Null Key The keytab file is wrong or the user principal credentials do not exist in the keytab file. Ensure that the keytab file is correct.
Cannot find KDC for HADOOPBI.com The Kafka server is not enabled in Kerberos mode. Ensure that the Kafka server is configured correctly in the Kerberos mode.
Checksum error; received checksum does not match computed checksum The keytab file that is specified for the connection is different from the keytab file on the Kerberos server for the same user principal. Ensure that the keytab file is correct for the user principal.
Clock skew too great If the current time on the Kerberos client is not in sync with the Kerberos server, the difference between the Kafka client and the Kerberos server is not zero. Ensure that the time difference is less than 5 minutes.
SAX parser exception thrown: The input ended before all started tags were ended. Last tag started was 'AdvancedKafkaConfigOptions' The value of Client configuration properties can contain multiple lines of key-value entries. Because each property can be parameterized by with the #name# format, this property is also checked for whether a parameter is present. At the same time, the value of the Kafka client properties must conform to Java Properties class requirements. The # character can be interpreted as either a comment or as the start of a job parameter. When the # character appears within the last line of this property, the preprocessor assumes that the character matches a second # character and it fails if it is not found. In Client configuration properties, make sure that at least one new empty line is present at the end of each property.