Kafka Producer
The Kafka Producer destination writes data to a Kafka cluster. The destination can also send responses to a microservice origin when used in a microservice pipeline. For information about supported versions, see Supported Systems and Versions.
When you configure a Kafka Producer, you define connection information, the partition strategy, and data format to use. You can also configure Kafka Producer to determine the topic to write to at runtime.
The Kafka Producer passes data to partitions in the Kafka topic based on the partition strategy that you choose. You can optionally write a batch of records to the Kafka cluster as a single message. When you want the destination to send responses to a microservice origin within a microservice pipeline, you specify the type of response to send. The destination writes all user-defined record header attributes to Kafka as Kafka message headers.
You can add additional Kafka configuration properties as needed. You can configure the destination to use Kafka security features. You can also configure the destination to pass message key values stored in the record to Kafka as Kafka message keys.
You can configure the Kafka Producer to work with the Confluent Schema Registry. The Confluent Schema Registry is a distributed storage layer for Avro schemas which uses Kafka as its underlying storage mechanism.
You can also use a connection to configure the destination.
Broker List
The Kafka Producer connects to Kafka based on the topic and associated brokers that you specify. To ensure a connection in case a specified broker goes down, list as many brokers as possible.
Runtime Topic Resolution
Kafka Producer can write a record to the topic based on an expression. When Kafka Producer evaluates a record, it calculates the expression based on record values and writes the record to the resulting topic.
When performing runtime topic resolution, Kafka Producer can write to any topic by default. You can create an allowlist of topics to limit the number of topics Kafka Producer attempts to use. When you create an allowlist, any record that resolves to an unlisted topic is sent to the stage for error handling. Use an allowlist when record data might resolve to invalid topic names.
Partition Strategy
The partition strategy determines how to write data to Kafka partitions. You can use a partition strategy to balance the work load or to write data semantically.
- Round-Robin
- Writes each record to a different partition using a cyclical order. Use for load balancing.
- Random
- Writes each record to a different partition using a random order. Use for load balancing.
- Expression
- Writes each record to a partition based on the results of the partition expression. Use for semantic partitioning.
- Default
- Writes each record using the default partition strategy that Kafka provides.
Send Microservice Responses
The Kafka Producer destination can send responses to a microservice origin when you use the destination in a microservice pipeline.
- All successfully written records.
- Responses from the destination system - For information about the possible responses, see the documentation for the destination system.
Additional Kafka Properties
You can add custom Kafka configuration properties to the Kafka Producer destination.
When you add a Kafka configuration property, enter the exact property name and the value. The stage does not validate the property names or values.
If custom configurations conflict with other stage properties, the stage generates an error unless you select the Override Stage Configurations check box. With the check box selected, the custom configurations override other stage properties. For example, to use a SASL mechanism other than PLAIN or GSSAPI (Kerberos), which the stage provides, add the necessary properties as custom configuration properties and select the Override Stage Configurations check box. For information about the necessary properties, see the Kafka documentation.
- key.serializer.class
- metadata.broker.list
- partitioner.class
- producer.type
- serializer.class
Writing Kafka Message Headers
When Data Collector uses a Kafka Java client version 0.11 or later, the Kafka Producer destination includes all user-defined record header attributes as Kafka message headers when writing messages to Kafka. User-defined record header attributes are those that you deliberately add to records as part of the pipeline logic.
The destination does not include internal attributes or automatically generated record header attributes in Kafka message headers.
${record:attribute('<existing attribute name>')}
.Example
Say a pipeline includes an SFTP/FTP/FTPS Client origin that generates several record
header attributes for information about the originating file for the record. The
attributes include file
for the path and name of the file, and
URI
for the URL used to access the remote server.
You want the Kafka Producer destination to include the URI
attribute
as a Kafka message header when writing to Kafka. You also want to include the
timestamp of the approximate time of processing.
- To include the processing time in an attribute, you specify the following
properties:
- Header Attribute property:
processingTime
- Header Attribute Expression property:
${time:now()}
The
time:now
function returns the current time from the Data Collector machine. - Header Attribute property:
- To include the URL used to access the remote server, you specify the
following properties:
- Header Attribute property:
originatingURL
- Header Attribute Expression property:
${record:attribute('URI')}
The expression evaluates to the value of the
URI
record header attribute generated by the SFTP/FTP/FTPS Client origin. - Header Attribute property:
When the Kafka Producer destination writes messages to Kafka, it includes these
user-defined record header attributes as message headers for each message. It does
not include the file
attribute or any other record header
attributes that are automatically generated by the stages or the pipeline.
Kafka Security
You can configure the Kafka Producer destination to connect securely to Kafka through SSL/TLS, SASL, or both. For more information about the methods and details on how to configure each method, see Security in Kafka Stages.
Data Formats
The Kafka Producer destination writes data to Kafka based on the data format that you select.
- Avro
- The destination writes records based on the Avro schema.
- Binary
- The stage writes binary data to a single field in the record.
- Delimited
- The destination writes records as delimited data. When you use this data format, the root field must be list or list-map.
- JSON
- The destination writes records as JSON data. You can use one of
the following formats:
- Array - Each file includes a single array. In the array, each element is a JSON representation of each record.
- Multiple objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
- Protobuf
- Writes one record in a message. Uses the user-defined message type and the definition of the message type in the descriptor file to generate the message.
- SDC Record
- The destination writes records in the SDC Record data format.
- Text
- The destination writes data from a single text field to the destination system. When you configure the stage, you select the field to use.
- XML
- The destination creates a valid XML document for each record. The
destination requires the record to have a single root field that
contains the rest of the record data. For details and
suggestions for how to accomplish this, see Record Structure Requirement.
The destination can include indentation to produce human-readable documents. It can also validate that the generated XML conforms to the specified schema definition. Records with invalid schemas are handled based on the error handling configured for the destination.
Configuring a Kafka Producer Destination
Configure a Kafka Producer destination to write data to a Kafka cluster.