Kafka

Kafka connector enables seamless data exchange with both Apache Kafka and Confluent Kafka, which are distributed publish/subscribe messaging systems. The connector provides a unified, high-throughput, and low-latency platform for managing data Streams effectively. Using the Kafka connector, you can create actions and run various operations, such as retrieving data from or inserting data into Kafka topics.

Kafka connector consists of the following components:

  • Account - Facilitates the connection with Apache Kafka or Confluent Kafka servers. Configuring an account is essential to run any operation through Kafka actions.
  • Actions - Facilitates to produce and consume messages from Kafka topics. Actions are based on templates that are provided by the Kafka connector. Each action represents a specific task that is run on a resource, such as using the Produce action to publish a message to a specified Kafka topic.

Account Types

A Kafka connector account is used to connect to a Kafka cluster, establishing a network connection to one or more Kafka brokers that use the security protocol specified. Kafka connector provides two types of accounts:

  • Producer Account is used to publish messages to a specific Kafka topic. The producer-related actions use the Producer Account configurations, such as serialization, batch processing, compression to process the messages before sending to the Kafka server.
  • Consumer Account is used to consume messages from one or more specified Kafka topics. The consumer-related action uses the Consumer Account configurations, such as deserialization to process the messages after receiving them from the Kafka server, run group co-ordination, and offset management.

Creating a New Account

  1. Select your project.

  2. Click Connectors. The list of available connectors appear.

  3. In the search bar for Available connectors,

    • Click the Search icon.
    • Type Kafka.
    • Click the Kafka icon.
  4. Specify the following details in the Added Account page to create an account that can be used to access Kafka systems:
    • Name - Name of the account by which the Kafka server is known to Kafka developers, clients, and partners.

    • Description- Description of the account.

    • Distribution type- Choose the type of Kafka distribution that you want to connect. You can choose either Confluent Kafka(Connects to Confluent Kafka distribution) or Apache Kafka(Connects to Apache Kafka distribution).

      Note: Schema Registry and Avro serializer and deserializer are not supported.
    • Client type-Type of account that represents whether you want to connect to a Kafka server as a producer or consumer. Possible values are -
      • Producer - Account type used to publish messages to a specific Kafka topic.
      • Consumer - Account type used to consume messages from one or more specified Kafka topics.
    • Bootstrap servers- List of Kafka server names with the combination of host and port, which is used for establishing the initial connection to the Kafka cluster. Use ", " (comma) to delimit multiple servers and ": " (Colon) to delimit host and port. For example - host1:port1,host2:port2.
    • Key serializer- Serializer type for the key. Available if the Client type is Producer.
    • Value serializer- Serializer type for the value. Available if the Client type is Producer.

    • Key deserializer- Deserializer type for the key. Available if the Client type is Consumer.

    • Value deserializer- Deserializer type for the value. Available if the Client type is Consumer.

    • Client ID- String identifying the client, which is passed to the Kafka server when a request is sent.

    • Security protocol- Mode of communication and authentication mechanism that is used to exchange data that includes credentials. Possible values are -
      • PLAINTEXT- Mode of communication where the data is exchanged as plain text with no authentication. No additional configuration is required.
      • SASL_PLAINTEXT- Default. SASL is used for authentication. However, data that includes the credentials is exchanged in plain text with no encryption.
      • SASL_SSL- SASL is used for authentication and the data is SSL encrypted before exchange. This is the most secure mode of authentication and encryption.
      • SSL- Authentication is not done, but data that includes credentials is SSL encrypted before the exchange. For more information about authentication mechanisms and JAAS configuration in Kafka servers, see the Kafka documentation. For more information about configuring keystore and truststore, see Work with projects, and How to generate a private-public key pair using OpenSSL.
      Note: When editing Apache Kafka accounts, if the Security protocol is set to -
      • PLAINTEXT, you cannot switch to other secure modes.

      • SASL_PLAINTEXT, SASL_SSL, SSL, you can switch to any of the three secure modes but not to PLAINTEXT.

    • Group ID- Unique string that identifies the consumer group of the consumer account. Consumer in the group share the load of processing messages from topic partitions. If a consumer from a group fails, the remaining consumers in the group can take over its partitions. Available if the Client type is Consumer.

  • If the Client type is Producer, enter the following properties:
    • Acknowledgments- Number of acknowledgments the producer requires the lead broker to have received before considering a request as complete. This controls the acknowledgments durability of messages that are sent. For more information, see the Kafka documentation. The values that are allowed are as follows -
      • None- Message durability is high data loss risk. The producer does not wait for any acknowledgment from the broker. The message is immediately added to the socket buffer and considered as sent. It cannot be confirmed that the server receives the message. The configuration does not take effect if you retry. The offset given back for each message is always be set to -1.
      • Leader- Message durability is partial data loss risk. The lead broker writes the message to its local log but responds without awaiting for the full acknowledgement from all followers. If the leader fails immediately after acknowledging the record but before the followers replicate it, then the message is lost.
      • Replica- Message durability is no data loss risk. The lead broker waits for the full set of in-sync replicas to acknowledge the message. This helps make sure that the message is not lost when at least one in-sync replica remains alive.
    • Request timeout (ms)- Maximum amount of time in milliseconds that the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fail the request if retries are exhausted. The default value is 30000.

    • Compression type- Algorithm used to compress the messages before they are sent to the Kafka broker. Possible values are None, GZip, Snappy, LZ4, Zstandard. The default value is None.

    • Message send retries- Number of attempts by a Kafka producer to send a message to the Kafka broker if the initial attempt fails. This helps make sure that the messages are delivered reliably in a distributed system, even when the first attempt fails due to network problems, broker unavailability, or other temporary issues. The default value is INT32_MAX, which is 2147483647.

    • Batch size (bytes)- Batch size in which the Kafka producer consolidates the messages that are sent to the same partition together before sending to the Kafka broker. Sending messages in batches improves throughput by reducing the number of requests that are sent, thus making more efficient use of network resources. The default batch size is 16384.

    • TCP send buffer (bytes)- Size of the TCP send buffer SO_SNDBUF in bytes when sending data. If the value is set to -1, Kafka defaults to the operating system settings for the TCP send buffer size, letting the operating system manage the buffer size according to its own defaults and optimizations. The default value is 131072.

  • If the Client type is Consumer, enter the following properties:
    • Enable auto commit- Enables or disables the Kafka broker’s ability to automatically commit the consumer’s offset. When the value is set to True, auto-commit is enabled, the Kafka broker takes the responsibility for committing the consumer’s offset at the intervals that are specified in the Auto commit interval (ms)field. However, when the value is set to False, auto-commit by the Kafka broker is disabled, the Kafka connector is responsible for committing the consumer’s offsets after processing the messages. The default value is True.
    • Auto commit interval (ms)- Frequency in milliseconds that the Kafka broker auto-commits the consumer offsets to the Kafka server, if the Enable auto commit is True. The default value is 5000.
    • Auto offset reset- Defines how the Kafka consumer behaves when it needs to start reading from a topic and without a previously committed offset, or when the committed offset is invalid or out of range. Select one of the following values:
      • Earliest- Consumer starts reading from the earliest offset available in the topic, meaning it reads all messages from the beginning of the topic if there are no previously committed offsets.
      • Latest- Consumer starts reading from the latest offset, meaning it will receive only the new messages that are produced after the consumer starts. It does not process any historical messages that were produced before it started. Use this option if you care about new data and do not need to process historical messages. This is the default setting for many Kafka clients.
      • None- Default. Consumer returns exceptions if no previous offset is found for the consumer’s group and this value is set. Use this option when it is critical to handle cases where offsets are missing or invalid explicitly and when you must ensure that no processing occurs if no offsets are available.
    • Session timeout (ms)- Time in milliseconds that the consumer waits for a response to a request before it times out and returns an error while using the Apache Kafka’s group management facility. The default value is 45000.
  • In the Security credentials section,

    • Truststore- Alias of the truststore configuration. The truststore contains trusted certificates that are used to determine the trust for the remote server peer certificates. You can add a Truststore certificate by selecting the "+" icon next to the Truststore Alias drop-down list. Available if the Security protocolis SASL_SSL or SSL.
    • Keystore- Alias for the keystore configuration. You can add a Keystore certificate by selecting the " "+"" icon next to the Keystore Alias dropdown list. Available if the Security protocol is SASL_SSL or SSL.
    • JAAS config- JAAS login context parameters in the format used by JAAS configuration files. Available if the Security protocol is SASL_PLAINTEXTor SASL_SSL. For example, org.apache.kafka.common.security.plain.PlainLoginModule required username=‘KafkaConnector’ password=‘cert@XXXX’
    • In the Other properties section, you can specify any additional properties that you prefer to add for the account. Type the property name and enter the value in the corresponding input text field.
      • Property name: Name of the property.
      • Property value: Value for the property.
      Note: If the Security protocol is set to SASL_PLAINTEXT or SASL_SSL, then the default value SASL mechanism is “GSSAPI”. If you want to use another SASL mechanism, configure in the Other properties field. For example, to configure the SASL mechanism to plain in the Other properties field, add the property sasl.mechanism = PLAIN.
  • Click Next. The Add account > Advanced settings page appears.

  • Provide the following details in the Advanced settings page to configure connection pooling manually:

    • Enable connection pooling- Toggle the slider to right to enable configuration of the connection pooling details. Disable the connection pooling to use system-defined values. Connection pooling is disabled by default.

    • Minimum pool Size- Number of connections to create when the connection is enabled. The system maintains the specified Minimum Pool Size of connections, irrespective of whether these connections remain idle.
    • Maximum pool Size- Maximum number of connections that can exist at a time in the connection pool.
    • Block timeout- Number of milliseconds that IBM® webMethods Integration waits to obtain a connection with the IBM Power® system before it times out and returns an error. For example, you have a pool with maximum pool size of 20. If receiving 30 simultaneous connection requests, 10 requests are queued, awaiting a connection from the pool.
      • If you set the Block timeout value to 5000, the 10 requests wait for 5 seconds to establish a connection before they time out and return an error. If the services that use the connections require 10 seconds to complete and return connections to the pool, the pending requests encounter failure and return an error stating the unavailability of the connections.
      • If you set the Block timeout value too high, you might encounter an error. If a request contains errors that delay the response, other requests will not be sent. This Block timeout must be tuned in along with the Maximum pool Size setting to accommodate such bursts in processing.
    • Expire timeout- Number of milliseconds that an inactive connection can remain in the pool before it is closed and removed from the pool. The connection pool removes inactive connections until the number of connections in the pool is equal to the Minimum Pool Size. The timer for an inactive connection is reset when the connection is used.
      • If you set the Expire timeout value too high, the connection pool might accumulate numerous unused and inactive connections. This not only consumes local memory but also ties up a connection on your backend resource. This might have an adverse effect if your resource has a limited number of connections.
      • If you set the Expire timeout value too low, the performance might degrade because of the increased activity of creating and closing connections. This Expire timeout must be tuned along with the Minimum Pool Size setting to avoid excessive opening and closing of connections during processing
  • Click Next. The Add account > Test and review page appears

  • Verify the account details.
  • Click Test connection to verify the connection. A success message appears if the connectivity is successful.
  • Click Enable. A success message appears.
  • Click Done. You are redirected to the Connectors page and the newly created account appears in the Kafka drop-down list. You can use this account to run any Kafka custom action created under the same project.

Action Types

Kafka connector provides templates to perform the following types of actions:

  • Produce - Delivers a message to a specified Kafka topic.
  • Bulk Produce- Delivers a batch of messages asynchronously to a specified Kafka topic, waits for the batch of messages to complete publishing, and returns a list of metadata containing the status of each message.
  • Consume- Consumes a message from one or more specified Kafka topics.

Every action has an input/output signature. The input signature defines the parameters that are passed to the action at runtime and the output signature defines the results of the action performed.

Note:
  • You must use a Producer Account with Producer or Bulk Produce actions and Consumer Account with Consume actions. Using Consumer Account with Produce or Bulk Produce actions or Producer Account with Consume action might lead to undesirable results.
  • You must use Apache Kafka accounts with Apache Kafka actions and Confluent Kafka accounts with Confluent Kafka actions. Mismatching the distribution types with the action types might result in undesirable outcomes.

Creating Produce and Bulk Produce Action

  1. In the Flow services page, click "+" (Plus) icon to create a new flow service.

  2. Provide a name and description for the flow service. The description is optional. For example, PublishCustomerInfo.

  3. On the Flow service step, type Kafka to select the Kafka connector.

  4. In the Type to choose action box, click Add Custom Operation.

  5. Perform the following in the Account page to create an account that can be used to access Kafka systems:
    • Enter the name and description for the action.
    • Select the Kafka account of type produce.
    • Click Next.
  6. Select the Produce or Bulk Produce action in the Action page.
  7. Click Next.

  8. Provide the following fields in the Produce or Bulk Produce page:

    • Topic- Kafka topic to which the message must be published. If you choose a topic from the provided list, the message is delivered to that topic. If you specify a topic that does not exist and the Kafka broker is configured to allow the automatic creation of new topics (auto.create.topics.enable), a new topic is created and the message is published when the action is run successfully.
    • Partition- Partition where the message must be stored. If a valid partition number is specified, that partition is used when sending the message. If no partition is specified but a key is provided, a partition is selected that uses a hash of the key. If a key or a partition is not specified, a partition is assigned in a round-robin fashion.
    • Batch size- Maximum number of messages that must be sent in a batch to be published. This field is available if the Bulk Produce action is selected.
  9. Click Next. The Summary page.

  10. Verify the details in the Summary page.

  11. Click the Show Input/Output to view the signature.

  12. Click Done. The flow service editor page appears.

  13. Click the Save icon to save the service.

  14. If the key or msg input fields are of object type, do the following -
    • Click the input/output icon.
    • Create 2 input fields of type String. For example, key and message.
    • Click the View/Edit Mapping icon.
    • Map the fields.
    • Click the Save icon to save the service.
  15. Provide the input parameters.

  16. Click the Run icon to run the service.

Creating Consume Action

  1. In the Flow services page, click "+" icon to create a new flow service.

  2. Provide a name and description for the flow service. The description is not mandatory. For example, ConsumeCustomerInfo.

  3. On the Flow service step, type Kafka to select the Kafka connector.

  4. In the Type to choose action box, click Add Custom Operation.

  5. Perform the following in the Account page to create an account that can be used to access Kafka systems:
    • Enter the name and description for the action.
    • Select the Kafka account of type consume.
    • Click Next.
  6. Select the Consume action in the Action page.

  7. Click Next.

  8. In the Consume page, click "+" icon to consume messages from the topic.
    • Topic- Kafka topic from which the message must be consumed. If you choose a topic from the provided list, the message is consumed from that topic. Also, you can create a new topic and consume from it, if the Kafka broker is configured to allow the automatic creation of new topics (auto.create.topics.enable).

    • Partition- Partition where the message is stored.

      Note:
      • Subscription- If the partition is not specified, Consume action uses subscribe API allowing the Kafka broker to manage the active consumer threads in the consumer group. The Kafka broker also rebalances the partitions dynamically as and when consumers leave the group or join the group. The consumer groups when managed by Kafka broker, are deleted if there are no active consumers with the consumer group ID polling the topic and max.poll.interval.ms time has elapsed. The default value for max.poll.interval.ms is 5 minutes.

      • Manual- If the partition is specified, the Consume action uses assign(partitions) API allowing the Kafka connector to manage the active consumer threads in the consumer group. The Kafka connector also takes over the responsibility of the rebalancing the partitions from the Kafka broker.

    • Offset - Name of the field that specifies the offset of the partition from where the consumer must start consuming the messages. Noneditable field. You can pas the value at runtime.
  9. Note: For Apache and Confluent consumer actions, you must specify the partition at design time to view the input signature. If the partition is not specified, the input signature is not displayed, thus limiting the user's ability to pass input at runtime.
  10. Click Next.

  11. Verify the details in the Summary page.

  12. Click Show Input/Output to view the signature.

  13. Click Done. The flow service editor page appears.

  14. Click the Save icon to save the service.

  15. Click the Run icon to run the service.

Limitations

  • Kafka connector is not supported in Workflows

    The Kafka connector is not supported in workflows, even though it appears in the connector list. Currently, Kafka is supported only in flow services.

  • Input Signature Without Design-Time Partition Assignment

    When configuring Apache or Confluent consumer actions, the partition must be specified at design time to display the input signature. If the partition is not defined, the input signature does not appear, therefore, limiting your ability to provide input at runtime.

  • Security Protocol Transition Limitations While Editing Apache Kafka Account

    When editing an Apache Kafka account, the following security protocols restrict the protocol flexibility and must be considered during the account configuration.

    • If the security protocol is set to PLAINTEXT, you cannot switch to any other secure modes (SASL_PLAINTEXT, SASL_SSL, or SSL).
    • If the security protocol is set to SASL_PLAINTEXT, SASL_SSL, or SSL, you can switch to any of these three secure modes but not to PLAINTEXT.