Operator KafkaProducer

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.ibm.streamsx.kafka/op$com.ibm.streamsx.kafka$KafkaProducer.svg

The KafkaProducer operator is used to produce messages on Kafka topics. The operator can be configured to produce messages to one or more topics.

Supported Kafka Versions

This version of the toolkit supports Apache Kafka v0.10.2, v0.11.x, 1.0.x, 1.1.x, v2.0.x, and v2.1.x.

Kafka Properties

The operator implements Kafka's KafkaProducer API of the Kafka client version 2.1.1. As a result, it supports all Kafka properties that are supported by the underlying API. The producer properties for the Kafka producer 2.1.1 can be found in the Apache Kafka 2.1 documentation.

When you reference files within your application, which are bundled with the Streams application bundle, for example an SSL truststore or a key tab file for Kerberos authentication, you can use the {applicationDir} placeholder in the property values. Before the configs are passed to the Kafka client, the {applicationDir} placeholder is replaced by the application directory of the application. Examples how to use the placeholder are shown below.


ssl.truststore.location={applicationDir}/etc/myTruststore.jks
or

sasl.jaas.config=com.ibm.security.auth.module.Krb5LoginModule required \
    debug=true \
    credsType=both \
    useKeytab="{applicationDir}/etc/myKeytab.keytab" \
    principal="kafka/host.domain@EXAMPLE.DOMAIN.COM";

Properties can be specified in a file or in an application configuration. If specifying properties via a file, the propertiesFile parameter can be used. If specifying properties in an application configuration, the name of the application configuration can be specified using the appConfigName parameter.

The only property that the user is required to set is the bootstrap.servers property, which points to the Kafka brokers, for example kafka-0.mydomain:9092,kafka-1.mydomain:9092,kafka-2.mydomain:9092. All other properties are optional.

The operator sets some properties by default to enable users to quickly get started with the operator. The following lists which properties the operator sets by default:

Property name

Default Value

client.id

Generated ID in the form: C-J<JobId>-<operator name>

key.serializer

See Automatic Serialization section below

value.serializer

See Automatic Serialization section below

acks

Controls the durability of records that are sent. Adjusted to all when in consistent region, and consistentRegionPolicy parameter is Transactional, otherwise acks is unchanged. The value 0 (fire and forget) is not recommended.

retries

When 0 is provided as retries and consistentRegionPolicy parameter is Transactional retries is adjusted to 1.

linger.ms

100

batch.size

32768

max.in.flight.requests.per.connection

1 when guaranteeOrdering parameter is true, limited to 5 when provided and consistentRegionPolicy parameter is Transactional, or 10 in all other cases.

enable.idempotence

true only when in consistent region and consistentRegionPolicy parameter is set to Transactional.

transactional.id

Randomly generated ID in the form: tid-<random_string> only when in consistent region and consistentRegionPolicy parameter is set to Transactional.

transaction.timeout.ms

adjusted to a minimum of drain timeout + 120000 milliseconds, but not greater than 900000. Adjusted only when in consistent region and consistentRegionPolicy parameter is set to Transactional.

metric.reporters

added to provided reporters: com.ibm.streamsx.kafka.clients.producer.ProducerMetricsReporter

metrics.sample.window.ms

adjusted to 10000

NOTE: Although properties are adjusted, users can override any of the above properties by explicitly setting the property value in either a properties file or in an application configuration. Not all properties or possible property values, which can be specified for the Kafka producer version 2.1, are supported by all Broker versions. An example for is the Zstandard compression algorithm, which is supported with broker version 2.1 and above.

Kafka Properties via Application Configuration

Users can specify Kafka properties using Streams' application configurations. Information on configuring application configurations can be found here: Creating application configuration objects to securely store data. Each property set in the application configuration will be loaded as a Kafka property. For example, to specify the bootstrap servers that the operator should connect to, an app config property named bootstrap.servers should be created.

Automatic Serialization

The operator will automatically select the appropriate serializers for the key and message based on their types. The following table outlines which deserializer will be used given a particular type:

Serializer

SPL Types

org.apache.kafka.common.serialization.StringSerializer

rstring

org.apache.kafka.common.serialization.IntegerSerializer

int32, uint32

org.apache.kafka.common.serialization.LongSerializer

int64, uint64

org.apache.kafka.common.serialization.FloatSerializer

float32

org.apache.kafka.common.serialization.DoubleSerializer

float64

org.apache.kafka.common.serialization.ByteArraySerializer

blob

Following metrics created by the Kafka producer client are exposed as custom metrics:

Custom Metric

Description

connection-count

The current number of active connections.

compression-rate-avg

The average compression rate of record batches (as percentage, 100 means no compression).

topic:compression-rate

The average compression rate of record batches for a topic (as percentage, 100 means no compression).

record-queue-time-avg

The average time in ms record batches spent in the send buffer.

record-queue-time-max

The maximum time in ms record batches spent in the send buffer.

record-send-rate

The average number of records sent per second.

record-retry-total

The total number of retried record sends

topic:record-send-total

The total number of records sent for a topic.

topic:record-retry-total

The total number of retried record sends for a topic

topic:record-error-total

The total number of record sends that resulted in errors for a topic

records-per-request-avg

The average number of records per request.

requests-in-flight

The current number of in-flight requests awaiting a response.

request-rate

The number of requests sent per second

request-size-avg

The average size of requests sent.

request-latency-avg

The average request latency in ms

request-latency-max

The maximum request latency in ms

batch-size-avg

The average number of bytes sent per partition per-request.

outgoing-byte-rate

The number of outgoing bytes sent to all servers per second

bufferpool-wait-time-total

The total time an appender waits for space allocation in nanoseconds.

buffer-available-bytes

The total amount of buffer memory that is not being used (either unallocated or in the free list).

Checkpointing behavior in an autonomous region

A config checkpoint clause has no effect to the operator.

Consistent Region Strategy

The producer operator can participate in a consistent region. The operator cannot be the start of a consistent region. When the consistent region drains, the operator flushes all accumulated outstanding records to the Kafka cluster.

The operator supports non-transactional (default behavior) and transactional message delivery. The delivery can be controlled by the consistentRegionPolicy parameter.

Non-transactional delivery

If the operator crashes or is reset while in a consistent region, the operator will write all tuples replayed. This ensures that every tuple sent to the operator will be written to the topic(s). However, non-transactional message delivery implies that duplicate messages may be written to the topic(s).

Transactional delivery

Messages are always inserted into a topic within the context of a transaction. Transactions are committed when the operator checkpoints. If the operator crashes or is reset while in a consistent region, the operator will abort an ongoing transaction and write all tuples replayed within a new transaction. External consumers configured with isolation.level=read_committed will not read the duplicates from the aborted transactions. Consumers that use a different isolation level will read duplicate messages as if they were produced without being part of a transaction.

For consumers that read the output topics with isolation.level=read_committed, the transactional producer minimizes number if duplicate messages with the downside that the produced messages are only visible at the checkpoint interval, which can be interpreted as additional latency.

A consumer that reads the output topics with isolation.level=read_committed can read duplicate messages when the consistent region fails after the Kafka transaction has been committed, but before the region has reached a consistent state.

NOTE 1: Transactions in Kafka have an inactivity timeout, which is configured by the producer property transaction.timeout.ms. This timeout is adjusted by the operator to a minimum of the drain timeout plus 120 seconds. The maximum value of this property is limited by the server property transaction.max.timeout.ms, which has a default value of 900000. The operator opens a transaction when the first tuple of a consistent cut is processed. Every tuple being processed resets the inactivity timer.

NOTE 2: For transactional delivery, the Kafka broker must have version 0.11 or higher. Older brokers do not support transactions.

Error Handling

Many exceptions thrown by the underlying Kafka API are considered fatal. In the event that Kafka throws an exception, the operator will restart. Some exceptions can be retried, such as those that occur due to network error. Users are encouraged to set the KafkaProducer retries property to a value greater than 0 to enable the producer's retry mechanism.

Summary

Ports
This operator has 1 input port and 0 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 13 parameters.

Optional: appConfigName, clientId, consistentRegionPolicy, flush, guaranteeOrdering, keyAttribute, messageAttribute, partitionAttribute, propertiesFile, timestampAttribute, topic, topicAttribute, userLib

Metrics
This operator does not report any metrics.

Properties

Implementation
Java

Input Ports

Ports (0)

This port consumes tuples to be written to the Kafka topic(s). Each tuple received on this port will be written to the Kafka topic(s).

Properties

Parameters

This operator supports 13 parameters.

Optional: appConfigName, clientId, consistentRegionPolicy, flush, guaranteeOrdering, keyAttribute, messageAttribute, partitionAttribute, propertiesFile, timestampAttribute, topic, topicAttribute, userLib

appConfigName

Specifies the name of the application configuration containing Kafka properties.

Properties
clientId

Specifies the client ID that should be used when connecting to the Kafka cluster. The value specified by this parameter will override the client.id Kafka property if specified.

Each operator must have a unique client ID. When operators are replicated by a parallel region, the channel-ID is automatically appended to the clientId to make the client-ID distinct for the parallel channels.

If this parameter is not specified and the client.id Kafka property is not specified, the operator will create an ID with the pattern C-J<job-ID>-<operator name> for a consumer operator, and P-J<job-ID>-<operator name> for a producer operator.

Properties
consistentRegionPolicy

Specifies the policy to use when in a consistent region.

When NonTransactional is specified, the operator guarantees that every tuple is written to the topic(s) at least once. When the consistent region resets, duplicates will most likely appear in the output topic(s). For consumers of the output topics, messages appears as they are produced.

When Transactional is specified, the operator will write tuples to the topic(s) within the context of a transaction. Transactions are commited when the operator checkpoints. This implies that downstream Kafka consumers may not see the messages until operator checkpoints. Transactional delivery minimizes (though not eliminates) duplicate messages for consumers of the output topics when they are configured with the consumer property isolation.level=read_committed. Consumers that read with the default isolation level read_uncommitted see all messages as they are produced. For these consumers, there is no difference between transactional and non-transactional message delivery.

For backward compatibility, the parameter value AtLeastOnce can also be specified, but is deprecated and can be removed in a future version. AtLeastOnce is equivalent to NonTransactional.

This parameter is ignored if the operator is not part of a consistent region. The default value is NonTransactional. NOTE: Kafka brokers older than version v0.11 do not support transactions.

Properties
flush

Specifies the number of tuples, after which the producer is flushed. When not specified, or when the parameter value is not positive, the flush interval is adaptively calculated to avoid queing times significantly over five seconds.

Flushing the producer makes all buffered records immediately available to send to the server (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with the buffered records. When a small value is specified, the batching of tuples to server requests and compression (if used) may get inefficient.

Under normal circumstances, this parameter should be used only when the adaptive flush control gives not the desired results, for example when the custom metrics buffer-available-bytes goes very small and record-queue-time-max or record-queue-time-avg gets too high.

Properties
guaranteeOrdering

If set to true, the operator guarantees that the order of records within a topic partition is the same as the order of processed tuples when it comes to retries. This implies that the operator sets the max.in.flight.requests.per.connection producer property automatically to 1 if retries are enabled, i.e. when the retries property is unequal 0, what is the operator default value.

If unset, the default value of this parameter is false, which means that the order can change due to retries.

Properties
keyAttribute

Specifies the input attribute that contains the Kafka key value. If not specified, the operator will look for an input attribute named key.

Properties
messageAttribute

Specifies the attribute on the input port that contains the message payload. If not specified, the operator will look for an input attribute named message. If this parameter is not specified and there is no input attribute named message, the operator will throw an exception and terminate.

Properties
partitionAttribute

Specifies the input attribute that contains the partition number that the message should be written to. If this parameter is not specified, the operator will look for an input attribute named partition. If the user does not indicate which partition the message should be written to, then Kafka's default partitioning strategy will be used instead (partition based on the specified partitioner or in a round-robin fashion).

Properties
propertiesFile

Specifies the name of the properties file containing Kafka properties. A relative path is always interpreted as relative to the application directory of the Streams application.

Properties
timestampAttribute

Specifies the attribute on the input port that contains the timestamp for the message. If not specified, the operator will look for an input attribute named messageTimestamp. If this parameter is not specified and there is no input attribute named messageTimestamp, the operator will use the timestamp provided by Kafka (broker config log.message.timestamp.type=[CreateTime|LogAppendTime]).

Properties
topic

Specifies the topic(s) that the producer should send messages to. The value of this parameter will take precedence over the topicAttribute parameter. This parameter will also take precedence if the input tuple schema contains an attribute named topic.

Properties
topicAttribute

Specifies the input attribute that contains the name of the topic that the message should be written to. If this parameter is not specified, the operator will look for an input attribute named topic. This parameter value is overridden if the topic parameter is specified.

Properties
userLib

Allows the user to specify paths to JAR files that should be loaded into the operators classpath. This is useful if the user wants to be able to specify their own partitioners. The value of this parameter can either point to a specific JAR file, or to a directory. In the case of a directory, the operator will load all files ending in .jar onto the classpath. By default, this parameter will load all jar files found in <application_dir>/etc/libs.

Properties

Libraries

Operator class library
Library Path: ../../impl/java/bin, ../../opt/downloaded/*, ../../impl/lib/*