You can use the KafkaRead node to read an
individual message published on a Kafka topic, by specifying the offset position of the message in
the topic.
Before you begin
Read the following topics:
About this task
You can use a KafkaRead node to help with error processing in a Kafka flow. For
example, if the processing of a message fails in a Kafka consumer message flow, you can use the KafkaRead node to reprocess that individual message. You can also
use a KafkaRead node in a flow with a Timer node, to control
the processing of a range of messages in a partition on a topic. For more information, see Processing Kafka messages.
You can specify a message in a partition on a Kafka topic, by using the
partition property on the KafkaRead
node. For information about partitions in Kafka topics, see the Apache Kafka
documentation.
For information about subscribing to topics on a Kafka server by using a KafkaConsumer node, see Consuming messages from Kafka topics. For information about publishing messages
to a Kafka topic by using a KafkaProducer node, see Producing messages on Kafka topics.
By default, if the message at the specified offset is not available, the input message is sent to
the No match terminal. Alternatively, you can choose to read either the earliest or the latest
message in the partition if the specified message is unavailable, by setting one of those values in
the Action when message unavailable property of the KafkaRead node. You can also override this value dynamically by
using the mqsiapplybaroverride command.
Procedure
Complete the following steps to read an individual message that is published on a Kafka
topic:
- Create a message flow containing an input node, a KafkaRead node, and an output node.
- Configure the KafkaRead node by setting the
following properties:
- On the Basic tab, set the following properties:
- In the Topic name property, specify the name of the
Kafka topic containing the message that you want to read.
The topic name can be up to 255
characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _
(underscore), and - (dash).
- In the Partition number property, specify the number
of the Kafka partition for the topic that you want to use (valid values are between
0
and 255
). The default value is 0
.
- In the Bootstrap servers property, specify the host
name and port to be used for establishing the initial connection to the Kafka cluster; for example,
if you are using IBM Event
Streams (Kafka on IBM Cloud), specify the address of that server. You can specify
multiple servers by defining a list of host/port pairs, separated by commas.
- In the Message offset property, specify the offset
position of the message to be read. This property defines the position in the log of messages from
where the KafkaRead node will read the message. The default
value is
0
.
- In the Action when message unavailable property,
select the action that will be taken if the message at the specified offset is not available:
- Select no match to propagate the input message to
the
No match
terminal. This is the default value.
- Select earliest to read the message with the
earliest offset for the topic partition.
- Select latest to read the message with the latest
offset for the topic partition.
- In the Client ID property, specify the client name to
be used when connecting to the Kafka server.
The client name can be up to 255 characters in
length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and -
(dash).
- Use the Add server name suffix to client ID property
to specify whether you want to suffix the client ID with the name of the integration server. This
property is selected by default, and adds the integration server name to the end of the client ID,
in the following format:
-integrationServerName
for an
independent integration server, or
integration_node_name-integration_server_name
for
an integration server that is managed by an integration node.
- In the Timeout (s) property, specify the length of
time (in seconds) in which the KafkaRead node must read the
message. If the message has not been read within this timeout period, the action specified by the
Action when message unavailable property is taken.
- On the Security tab, set the following properties:
- In the Security protocol property, select the protocol
to be used when communicating with the Kafka server. Valid values are:
- PLAINTEXT
- SSL
- SASL_PLAINTEXT
- SASL_SSL
The default value for this property is PLAINTEXT.
If you are using Event Streams, this
property must be set to SASL_SSL.
If either
SASL_PLAINTEXT or SASL_SSL is selected, you must configure the user ID and password that will be used to
authenticate with the Kafka server by configuring the security identity, as described in Configuring security credentials for connecting to Kafka.
- In the SSL protocol property, select the SSL protocol
to be used if the Security protocol property is set to
either SSL or SASL_SSL. You can select one of the following values from the editable list, or you can
specify an alternative value:
The default value for this property is TLSv1.2.
For information about other properties that you can set for the
KafkaRead node, see
KafkaRead node.
For information about setting properties by using a policy, see Kafka policy.
What to do next
For information about properties that can be overridden dynamically in the flow, see Using local environment variables with Kafka nodes.