Consuming messages from Kafka topics

You can use the KafkaConsumer node to receive messages that are published on a Kafka topic.

Before you begin

Read the following topics:

About this task

You can use a KafkaConsumer node in a message flow to subscribe to a specified topic on a Kafka server. The KafkaConsumer node then receives messages that are published on the Kafka topic, as input to the message flow.

You can use a KafkaProducer node to publish messages from your message flow to a topic that is hosted on a Kafka server, and you can use a KafkaRead node to read an individual message on a Kafka topic. For more information about using these nodes, see Producing messages on Kafka topics and Reading an individual message from a Kafka topic.

Each KafkaConsumer node consumes messages from a single topic; however, if the topic is defined to have multiple partitions, the KafkaConsumer node can receive messages from any of the partitions. For more information about partitions in Kafka topics, see the Apache Kafka documentation.

The KafkaConsumer node reads messages from Kafka non-transactionally, which means that, if an error occurs or the message is rolled back to the input node, and no catch terminal is connected, the message is not reprocessed by the input node.

In order to process messages that are received concurrently, you can configure additional instances on the KafkaConsumer node. When additional instances are configured, a single Kafka message consumer is created, and the messages are distributed to the additional flow instances. As messages are processed concurrently, message ordering is not preserved when additional instances are being used. For more information about specifying additional instances, see KafkaConsumer node.

You can also increase concurrency by deploying multiple KafkaConsumer nodes that share the same Group ID; Kafka ensures that messages that are published on the topic are shared across the consumer group. For more information about how Kafka shares the message across multiple consumers in a consumer group, see the Apache Kafka documentation.

You can use Kafka custom header properties to add metadata to Kafka messages for use during message processing. These properties are set in the LocalEnvironment, in a folder called KafkaHeader. For more information, see Setting and retrieving Kafka custom header properties.

Procedure

Complete the following steps to receive messages that are published on a Kafka topic:

  1. Create a message flow containing a KafkaConsumer node and an output node.
  2. Configure the KafkaConsumer node by setting the following properties:
    1. On the Basic tab, set the following properties:
      • In the Topic name property, specify the name of the Kafka topic to which you want to subscribe.

        The topic name can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash).

      • In the Bootstrap servers property, specify the host name and port of the Kafka server; for example, if you are using IBM Event Streams (Kafka on IBM Cloud), specify the address of that server.
      • In the Consumer group ID property, specify the ID of the consumer group to which this consumer belongs. This ID can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash).
      • In the Default message offset property, specify the message offset that is used by the consumer when it starts, if no valid message offset exists; for example, when the consumer starts for the first time, or when the consumer is not committing its offset to Kafka. Possible values are:
        • Earliest
        • Latest

        This property defines the starting position in the log of messages from where the KafkaConsumer node starts reading messages. The default setting is Latest, which means that only messages published after the flow is started will be read.

        If Earliest is selected, the KafkaConsumer node starts reading messages from the first message in the log of messages for the specified topic.

        If the Commit message offset in Kafka property is not selected, this action is repeated each time the flow containing the KafkaConsumer node is started. If the Commit message offset in Kafka property is selected, the consumer position in the log of messages for the topic is saved in Kafka as each message is processed; therefore, if the flow is stopped and then restarted, the input node starts consuming messages from the message position that had been reached when the flow was stopped.

      • Use the Commit message offset in Kafka property to specify whether the current message offset in Kafka is saved automatically, which allows messages to be consumed from the saved position when the consumer is restarted. This property is selected by default. If the integration server or integration node is restarted, the last saved position is used.
      • Use the Wait for message offset commit to complete property to control whether the KafkaConsumer node waits for the message offset to be committed to Kafka before delivering the message to the message flow. If this property is not selected, the KafkaConsumer node does not wait for the response from the commit operation to be received before delivering the message to the flow. As a result, message throughput is increased in exchange for a reduction in message reliability, as messages can be redelivered to the message flow if the request to commit the consumer offset subsequently fails. This property is selected by default.
      • In the Client ID property, specify the client name to be used when connecting to the Kafka server.

        The client name can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash).

      • Use the Add server name suffix to client ID property to specify whether you want to suffix the client ID with the name of the integration server and integration node. This property is selected by default, and adds the integration server and integration node name to the end of the client ID, in the following format:
        'integration_server_name'-'integration_node_name'
    2. On the Advanced tab, set the following properties:
      • In the Connection timeout (sec) property, specify the maximum time that the KafkaConsumer node will wait to establish a connection with the Kafka server. The default value for this property is 15 seconds.
        Note: The value specified for the Connection timeout must be greater than the value specified for the Session timeout.
      • In the Session timeout (sec) property, specify the maximum time that the Kafka server should wait to receive confirmation (in the form of periodic 'heartbeats') that the KafkaConsumer node is live. This property is used to detect KafkaConsumer node failures when using Kafka's group management facility. The KafkaConsumer node sends periodic heartbeats to indicate its liveness to the Kafka server. If no heartbeats are received by the Kafka server before the expiration of this session timeout, the Kafka server removes this consumer from the group and initiates a rebalance. The minimum valid value for this property is 10 seconds, which ensures that the session timeout is greater than the length of time between heartbeats. The Session timeout value must be less than the Connection timeout value.
      • In the Receive batch size property, specify the maximum number of records that are received from the Kafka server in a single batch. This property is used for the max.poll.records value specified by the KafkaConsumer node when receiving messages from the Kafka server.
    3. On the Security tab, set the following properties:
      • In the Security protocol property, select the protocol to be used when communicating with the integration node. Valid values are:
        • PLAINTEXT
        • SSL
        • SASL_PLAINTEXT
        • SASL_SSL

        The default value for this property is PLAINTEXT.

        Note: If you are using Event Streams, this property must be set to SASL_SSL.

        If either SASL_PLAINTEXT or SASL_SSL is selected, you must configure the user ID and password that will be used to authenticate with the Kafka server by configuring the security identity, as described in Configuring security credentials for connecting to Kafka.

      • In the SSL protocol property, select the SSL protocol to be used if the Security protocol property is set to either SSL or SASL_SSL. You can select one of the following values from the editable list, or you can specify an alternative value:
        • TLSv1
        • TLSv1.1
        • TLSv1.2
        • TLSv1.3
        The default value for this property is TLSv1.2.
    For information about other properties that you can set for the KafkaConsumer node, see KafkaConsumer node. For information about setting properties by using a policy, see Kafka policy.

    For more information about how to diagnose connection problems between IBM App Connect Enterprise and Kafka, see Resolving problems when using Kafka nodes.

What to do next

For information about properties that can be overridden dynamically in the flow, see Using local environment variables with Kafka nodes.