Consuming messages from Kafka topics
You can use the KafkaConsumer node to receive messages that are published on a Kafka topic.
Before you begin
About this task
You can use a KafkaConsumer node in a message flow to subscribe to a specified topic on a Kafka server. The KafkaConsumer node then receives messages that are published on the Kafka topic, as input to the message flow.
You can use a KafkaProducer node to publish messages from your message flow to a topic that is hosted on a Kafka server. You can use a KafkaRead node to read an individual message on a Kafka topic. For more information about using these nodes, see Producing messages on Kafka topics and Reading an individual message from a Kafka topic.
Each KafkaConsumer node consumes messages from a single topic; however, if the topic is defined to have multiple partitions, the KafkaConsumer node can receive messages from any of the partitions. For more information about partitions in Kafka topics, see the Apache Kafka documentation.
- On propagate. The KafkaConsumer node saves its position before the message is propagated to the next node in the message flow. The node waits until it receives an acknowledgment form the Kafka server that its position was saved.
- On propagate async. The KafkaConsumer node does not wait for an acknowledgment that its position was saved.
- Transactionally The KafkaConsumer node saves its position to Kafka when the message that was received is successfully processed by the message flow. When this option is selected, you must specify a value for Transactional Id on the Transactionality tab of the KafkaConsumer node. This parameter allows the saving of the consumer position to occur within the same unit of work as messages that are published by KafkaProducer nodes that use the same Transactional Id.
The KafkaConsumer node can be configured by using the Isolation Level property on the Transactionality tab o pause if it encounters a message that has been published transactionally but not yet committed.
KafkaConsumer nodes can operate in a consumer-group, which is identified by the Consumer group ID parameter, which enables messages that are published on a Kafka topic to be shared across the members of the consumer group. The number of members of a consumer group cannot exceed the number of partitions that are defined on the Kafka topic. When you deploy message flows with the same Consumer group ID across multiple integration servers, the messages on a topic are shared across multiple integration servers. Alternatively, if you configure the Kafka connections property, the KafkaConsumer node creates multiple connections to Kafka. When you use the Kafka connections property and also configure additional instances, the message throughput is increased without deploying multiple copies of the message flow. However, message ordering is no longer preserved unless Commit message offset to Kafka is set to Transactionally.
You can also increase concurrency by deploying multiple KafkaConsumer nodes that share the same Consumer group ID; Kafka ensures that messages that are published on the topic are shared across the consumer group. For more information about how Kafka shares the message across multiple consumers in a consumer group, see the Apache Kafka documentation.
For more information about increasing the throughput of your message flows when consuming messages from a Kafka server, see Scaling messages with Kafka.
You can use Kafka custom header properties to add metadata to Kafka messages for use
during message processing. These properties are set in the LocalEnvironment, in a folder called
KafkaHeader. For more information, see Setting and retrieving Kafka custom header properties.
Procedure
Complete the following steps to receive messages that are published on a Kafka topic: