Consuming messages from Kafka topics

You can use the KafkaConsumer node to receive messages that are published on a Kafka topic.

Before you begin

Read the following topics:

About this task

You can use a KafkaConsumer node in a message flow to subscribe to a specified topic on a Kafka server. The KafkaConsumer node then receives messages that are published on the Kafka topic, as input to the message flow.

You can use a KafkaProducer node to publish messages from your message flow to a topic that is hosted on a Kafka server. You can use a KafkaRead node to read an individual message on a Kafka topic. For more information about using these nodes, see Producing messages on Kafka topics and Reading an individual message from a Kafka topic.

Each KafkaConsumer node consumes messages from a single topic; however, if the topic is defined to have multiple partitions, the KafkaConsumer node can receive messages from any of the partitions. For more information about partitions in Kafka topics, see the Apache Kafka documentation.

You can configure a KafkaConsumer node to save its position to the Kafka server by setting the Commit message offset to Kafka property on the Transactionality tab. When the position is saved to the Kafka server, if the flow is stopped and restarted, the KafkaConsumer node resumes processing at the next available message after the last message processed. Otherwise, it begins processing according to option that you select for the Default message offset property in the Basic tab of the KafkaConsumer node. (The options are earliest and latest). When you configure the KafkaConsumer to save its position, three options are available:
  • On propagate. The KafkaConsumer node saves its position before the message is propagated to the next node in the message flow. The node waits until it receives an acknowledgment form the Kafka server that its position was saved.
  • On propagate async. The KafkaConsumer node does not wait for an acknowledgment that its position was saved.
  • Transactionally The KafkaConsumer node saves its position to Kafka when the message that was received is successfully processed by the message flow. When this option is selected, you must specify a value for Transactional Id on the Transactionality tab of the KafkaConsumer node. This parameter allows the saving of the consumer position to occur within the same unit of work as messages that are published by KafkaProducer nodes that use the same Transactional Id.

The KafkaConsumer node can be configured by using the Isolation Level property on the Transactionality tab o pause if it encounters a message that has been published transactionally but not yet committed.

KafkaConsumer nodes can operate in a consumer-group, which is identified by the Consumer group ID parameter, which enables messages that are published on a Kafka topic to be shared across the members of the consumer group. The number of members of a consumer group cannot exceed the number of partitions that are defined on the Kafka topic. When you deploy message flows with the same Consumer group ID across multiple integration servers, the messages on a topic are shared across multiple integration servers. Alternatively, if you configure the Kafka connections property, the KafkaConsumer node creates multiple connections to Kafka. When you use the Kafka connections property and also configure additional instances, the message throughput is increased without deploying multiple copies of the message flow. However, message ordering is no longer preserved unless Commit message offset to Kafka is set to Transactionally.

You can also increase concurrency by deploying multiple KafkaConsumer nodes that share the same Consumer group ID; Kafka ensures that messages that are published on the topic are shared across the consumer group. For more information about how Kafka shares the message across multiple consumers in a consumer group, see the Apache Kafka documentation.

For more information about increasing the throughput of your message flows when consuming messages from a Kafka server, see Scaling messages with Kafka.

You can use Kafka custom header properties to add metadata to Kafka messages for use during message processing. These properties are set in the LocalEnvironment, in a folder called KafkaHeader. For more information, see Setting and retrieving Kafka custom header properties.

Procedure

Complete the following steps to receive messages that are published on a Kafka topic:

  1. Create a message flow that contains a KafkaConsumer node and an output node.
  2. Configure the KafkaConsumer node by setting the following properties:
    1. On the Basic tab, set the following properties:
      • In the Topic name property, specify the name of the Kafka topic to which you want to subscribe.

        The topic name can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash).

      • In the Bootstrap servers property, specify the host name and port of the Kafka server; for example, if you are using IBM Event Streams (Kafka on IBM Cloud), specify the address of that server.
      • In the Consumer group ID property, specify the ID of the consumer group to which this consumer belongs. This ID can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash).
      • In the Default message offset property, specify the message offset that is used by the consumer when it starts, if no valid message offset exists; for example, when the consumer starts for the first time, or when the consumer is not committing its offset to Kafka. Possible values are:
        • Earliest
        • Latest

        This property defines the starting position in the log of messages from where the KafkaConsumer node starts reading messages. The default setting is Latest, which means that only messages published after the flow is started are read.

        If Earliest is selected, the KafkaConsumer node starts reading messages from the first message in the log of messages for the specified topic.

      • In the Client ID property, specify the client name to be used when connecting to the Kafka server.

        The client name can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash).

      • Use the Add server name suffix to client ID property to specify whether you want to suffix the client ID with the name of the integration server and integration node. This property is selected by default, and adds the integration server and integration node name to the end of the client ID, in the following format:
        'integration_server_name'-'integration_node_name'
    2. On the Transactionality tab, complete the following steps:
      • Set Isolation level to one of the following values:
        • read_commited

          To receive messages, which have which were published outside of a transaction or were committed, select ‘read_committed’. If the Kafka consumer reaches an uncommitted message, it blocks the message until the owning transaction is committed or the transaction is rolled-back.

        • read_uncommited

          To receive messages, even if they are part of a transaction, which is not yet committed, select read_uncommitted.

      • Set Commit message offset to Kafka to one of the following values:
        • None

          The KafkaConsumer node does not save its position to Kafka. When this option is selected, each time the flow starts, it begins processing messages according to the value of the Default message offset node property that you set on the Basic tab.

        • On propagate

          The KafkaConsumer node saves its position to Kafka before it propagates the received message to the next message flow node in the flow.

        • On propagate async

          The KafkaConsumer node saves its position to Kafka before it propagates the received message to the next message flow node in the flow. However, the consumer node does not wait for confirmation that the consumer position was successfully saved to Kafka. Using this option allows the Kafka consumer to process more messages, but if a failure occurs, the same message might be processed more than once.

        • Transactionally

          The KafkaConsumer node saves its position to Kafka as part of a transaction. The transaction is committed, or rolled-back at the end of the message flow. You can identify the transaction by the Transactional Id property on the KafkaConsumer node. If you specify the same value for Transactional Id on KafkaProducer nodes on the same message flow, messages are published within the same transaction that is used to save the consumer position. By using the Transactional Id property in this way, you enable exactly-once message patterns to be implemented.

      • Enter a value for Transactional Id.

        This field is mandatory if the Commit message offset to Kafka property is set to Transactionally. Otherwise, the property is disabled. This value must be unique for every instance of the message flow. If the KafkaConsumer is configured to use multiple Kafka connections, this value is suffixed with the connection number.

      • Enter a value for Transaction Timeout

        The timeout for Kafka Transaction. If the message flow does not complete within this time, the transactional is rolled-back at the end of the flow and the operations that are performed as part of the transaction discarded. This field is disabled if Commit message offset to Kafka is not set to Transactionally.

    3. On the Advanced tab, set the following properties:
      • In the Connection timeout (sec) property, specify the maximum time that the KafkaConsumer node waits to establish a connection with the Kafka server. The default value for this property is 15 seconds.
        Note: The value that is specified for the Connection timeout must be greater than the value specified for the Session timeout.
      • In the Session timeout (sec) property, specify the maximum time that the Kafka server should wait to receive confirmation (in the form of periodic 'heartbeats') that the KafkaConsumer node is live. This property is used to detect KafkaConsumer node failures when using Kafka's group management facility. The KafkaConsumer node sends periodic heartbeats to indicate its liveness to the Kafka server. If no heartbeats are received by the Kafka server before the expiration of this session timeout, the Kafka server removes this consumer from the group and initiates a rebalance. The minimum valid value for this property is 10 seconds, which ensures that the session timeout is greater than the length of time between heartbeats. The Session timeout value must be less than the Connection timeout value.
      • In the Receive batch size property, specify the maximum number of records that are received from the Kafka server in a single batch. This property is used for the max.poll.records value that is specified by the KafkaConsumer node when it receives messages from the Kafka server.
      • In the Properties file property, specify the path name of a kafka.properties file to be used to set Kafka client properties.

        You can use a properties file to set properties on the Kafka consumer that cannot be set through the KafkaConsumer node properties. If the properties file specifies a property that is also set on the KafkaConsumer node or by a Kafka policy, the value set in the properties file takes precedence. For more information about the complete list of properties that are available, see the Apache Kafka documentation.

      • In the Header scheme property, specify how Kafka message headers are interpreted when messages are processed. Select String to interpret all Kafka headers as UTF-8 strings, or select Bytes to leave the headers in their raw binary format. The default value is String.
    4. Optional: On the Avro tab, if you want to use an Avro schema to deserialize the message that is received, complete the following steps:
      1. In the "Serialization mode" field, select Avro from the menu.
      2. In the "Policy properties" field, specify the name of the schema registry policy that contains the Avro schema.
    For information about other properties that you can set for the KafkaConsumer node, see KafkaConsumer node. For information about setting properties by using a policy, see Kafka policy.

    For more information about how to diagnose connection problems between IBM App Connect Enterprise and Kafka, see Resolving problems when using Kafka nodes.

What to do next

For information about properties that can be overridden dynamically in the flow, see Using local environment variables with Kafka nodes.