Producing messages on Kafka topics

You can use the KafkaProducer node to connect to the Kafka messaging system and publish messages on Kafka topics.

Before you begin

Read the following topics:

About this task

You can use the KafkaProducer node to publish messages that are generated from within your message flow to a topic that is hosted on a Kafka server. The published messages are then available to be received by consumers (subscribers) reading from the topic.

You can use a KafkaConsumer node in a message flow to subscribe to a specified topic on a Kafka server. You can also use a KafkaRead node to read an individual message on a specified topic. For more information about using these nodes, see Consuming messages from Kafka topics and Reading an individual message from a Kafka topic.

You can configure the KafkaProducer node to publish messages transactionally or non-transactionally. When the KafkaProducer node operates non-transactionally, messages that are published are immediately available to the message consumers. If you set the Acks property on the KafkaProducer node to 1 or All, the KafkaProducer node waits for confirmation that the message was successfully received by the Kafka server before it continues in the flow. Failure to receive confirmation does not guarantee that the messages failed to be delivered to consumers. If this property is set to All, the KafkaProducer waits for confirmation from all replicas of the topic. This option provides the strongest available guarantee that the record was received. When Acks is enabled, the offset of the message that is published is stored in the local environment.
When the KafkaProducer node operates transactionally, messages that are published are not available to KafkaConsumer nodes that are configured with the Isolation level parameter set to read_committed unless the transaction is committed at the end of the message flow. If the message flows ends with an exception, the transaction is rolled-back and KafkaConsumer nodes that are configured with the read_committed option do not receive the message. KafkaConsumer nodes that are configured with read_uncommitted receive the messages even if they are subsequently rolled-back. When a transactional KafkaProducer node is operating in a message flow that was started with a transactional KafkaConsumer node that has the same value for the Transactional Id parameter, the saving of the consumer position and the publishing of messages can be enabled to occur within the same transaction. For a KafkaConsumer node and a KafkaProducer node to use the same transaction in this way, their configurations must have matching values for significant node properties, such as Bootstrap servers and Client ID. These values are compared, and the publish operation is rejected with an appropriate error message if differences are found.

You can use Kafka custom header properties to add metadata to Kafka messages for use during message processing. These properties are set in the LocalEnvironment, in a folder called KafkaHeader. For more information, see Setting and retrieving Kafka custom header properties.

Procedure

Complete the following steps to use IBM App Connect Enterprise to publish messages to a topic on a Kafka server:

  1. Create a message flow that contains an input node, such as an HTTPInput node, and a KafkaProducer node.
    For more information about how to create a message flow, see Creating a message flow.
  2. Configure the KafkaProducer node by setting the following properties:
    1. On the Basic tab, set the following properties:
      • In the Topic name property, specify the name of the Kafka topic on which you want to publish messages.

        The topic name can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash). The topic name can be changed dynamically by setting a local environment override. For more information, see Using local environment variables with Kafka nodes.

      • In the Bootstrap servers property, specify the hostname and port of the Kafka server that you want to connect to; for example, if you are using IBM Event Streams (Kafka on IBM Cloud), specify the address of that server.
      • In the Client ID property, specify the client name to be used when connecting to the Kafka server.

        The client name can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash).

      • Use the Add server name suffix to client ID property to specify whether you want to suffix the client ID with the name of the integration server and integration node. This property is selected by default, and adds the integration server and integration node name to the end of the client ID, in the following format:
        'integration_server_name'-'integration_node_name'
      • In the Acks property, select the number of acknowledgments that are expected from the Kafka cluster in order to consider the message successfully published.

        If this property is set to 0, the KafkaProducer node does not wait for any acknowledgment that the publish request has been processed by the Kafka server. This option is equivalent to a 'fire and forget' mode of operation.

        If this property is set to 1, the KafkaProducer node waits for a single acknowledgment from the Kafka server.

        If this property is set to All, the KafkaProducer node waits for acknowledgments from all replicas of the topic. This option provides the strongest available guarantee that the record was received.

      • In the Timeout property, specify the time (in seconds) to wait for a request to complete. The default value is 60 seconds.
    2. On the Transactionality tab, complete the following steps:
      • Set Transaction mode to one of the following values:
        • Automatic

          The publish operation occurs transactionally if the input message was received transactionally.

        • Yes

          The publish operation occurs transactionally.

        • No

          The publish operation occurs non-transactionally.

      • Enter a value for Transactional Id.

        This field is mandatory if the publish operation is to occur transactionally. If the message is to be published with the same transaction started by a KafkaConsumer node, this value must match the value that is specified on the KafkaConsumer node. For more information, see KafkaConsumer node.

      • Enter a value for Transaction Timeout

        This field is disabled if Commit message offset to Kafka is set to Transactionally in the Transactionality tab of a KafkaConsumer node in the message flow. For more information, see KafkaConsumer node.

    3. On the Security tab, set the following properties:
      • In the Security protocol property, select the protocol to be used when communicating with the integration node. Valid values are:
        • PLAINTEXT
        • SSL
        • SASL_PLAINTEXT
        • SASL_SSL

        The default value for this property is PLAINTEXT.

        Note: If you are using Event Streams, this property must be set to SASL_SSL.

        If either SASL_PLAINTEXT or SASL_SSL is selected, you must configure the user ID and password to use for authenticating with the Kafka server by configuring the security identity. For more information, see Configuring security credentials for connecting to Kafka.

      • In the SSL protocol property, select the SSL protocol to be used if the Security protocol property is set to either SSL or SASL_SSL. You can select one of the following values from the editable list, or you can specify an alternative value:
        • TLSv1
        • TLSv1.1
        • TLSv1.2
        • TLSv1.3
        The default value for this property is TLSv1.2.
    4. Optional: On the Avro tab, if you want to use an Avro schema to serialize the message that is produced, complete the following steps:
      1. In the "Serialization mode" field, select Avro from the menu.
      2. In the "Policy properties" field, specify the name of the schema registry policy that contains the Avro schema.
      3. Set the Schema Id by using one of the following methods:
        • Enter the value in the "Schema Id" field.
        • In a preceding node in the message flow, set the local environment property OutputLocalEnvironment.Destination.Kafka.Output.schemaId to the required value.
    For more information about other properties that you can set for the KafkaProducer node, see KafkaProducer node. For more information about setting properties by using a policy, see Kafka policy.

    For more information about how to diagnose connection problems between IBM App Connect Enterprise and Kafka, see Resolving problems when using Kafka nodes.

What to do next

For more information about properties that can be overridden dynamically in the flow, see Using local environment variables with Kafka nodes.