Reading an individual message from a Kafka topic

You can use the KafkaRead node to read an individual message published on a Kafka topic, by specifying the offset position of the message in the topic.

Before you begin

Read the following topics:

About this task

You can use a KafkaRead node to help with error processing in a Kafka flow. For example, if the processing of a message fails in a Kafka consumer message flow, you can use the KafkaRead node to reprocess that individual message. You can also use a KafkaRead node in a flow with a Timer node, to control the processing of a range of messages in a partition on a topic. For more information, see Processing Kafka messages.

You can specify a message in a partition on a Kafka topic, by using the partition property on the KafkaRead node. For information about partitions in Kafka topics, see the Apache Kafka documentation.

For information about subscribing to topics on a Kafka server by using a KafkaConsumer node, see Consuming messages from Kafka topics. For information about publishing messages to a Kafka topic by using a KafkaProducer node, see Producing messages on Kafka topics.

By default, if the message at the specified offset is not available, the input message is sent to the No match terminal. Alternatively, you can choose to read either the earliest or the latest message in the partition if the specified message is unavailable, by setting one of those values in the Action when message unavailable property of the KafkaRead node. You can also override this value dynamically by using the mqsiapplybaroverride command.

Procedure

Complete the following steps to read an individual message that is published on a Kafka topic:

  1. Create a message flow containing an input node, a KafkaRead node, and an output node.
    For more information about how to do this, see Creating a message flow.
  2. Configure the KafkaRead node by setting the following properties:
    1. On the Basic tab, set the following properties:
      • In the Topic name property, specify the name of the Kafka topic containing the message that you want to read.

        The topic name can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash).

      • In the Partition number property, specify the number of the Kafka partition for the topic that you want to use (valid values are between 0 and 255). The default value is 0.
      • In the Bootstrap servers property, specify the host name and port to be used for establishing the initial connection to the Kafka cluster; for example, if you are using IBM Event Streams (Kafka on IBM Cloud), specify the address of that server. You can specify multiple servers by defining a list of host/port pairs, separated by commas.
      • In the Message offset property, specify the offset position of the message to be read. This property defines the position in the log of messages from where the KafkaRead node will read the message. The default value is 0.
      • In the Action when message unavailable property, select the action that will be taken if the message at the specified offset is not available:
        • Select no match to propagate the input message to the No match terminal. This is the default value.
        • Select earliest to read the message with the earliest offset for the topic partition.
        • Select latest to read the message with the latest offset for the topic partition.
      • In the Client ID property, specify the client name to be used when connecting to the Kafka server.

        The client name can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash).

      • Use the Add server name suffix to client ID property to specify whether you want to suffix the client ID with the name of the integration server. This property is selected by default, and adds the integration server name to the end of the client ID, in the following format:
        -integrationServerName
        for an independent integration server, or
        integration_node_name-integration_server_name
        for an integration server that is managed by an integration node.
      • In the Timeout (s) property, specify the length of time (in seconds) in which the KafkaRead node must read the message. If the message has not been read within this timeout period, the action specified by the Action when message unavailable property is taken.
    2. On the Security tab, set the following properties:
      • In the Security protocol property, select the protocol to be used when communicating with the Kafka server. Valid values are:
        • PLAINTEXT
        • SSL
        • SASL_PLAINTEXT
        • SASL_SSL

        The default value for this property is PLAINTEXT.

        If you are using Event Streams, this property must be set to SASL_SSL.

        If either SASL_PLAINTEXT or SASL_SSL is selected, you must configure the user ID and password that will be used to authenticate with the Kafka server by configuring the security identity, as described in Configuring security credentials for connecting to Kafka.

      • In the SSL protocol property, select the SSL protocol to be used if the Security protocol property is set to either SSL or SASL_SSL. You can select one of the following values from the editable list, or you can specify an alternative value:
        • TLSv1
        • TLSv1.1
        • TLSv1.2
        The default value for this property is TLSv1.2.
    For information about other properties that you can set for the KafkaRead node, see KafkaRead node. For information about setting properties by using a policy, see Kafka policy.

What to do next

For information about properties that can be overridden dynamically in the flow, see Using local environment variables with Kafka nodes.