Apache Kafka Adapter
With the Apache Kafka adapter, Transformation Extender maps can connect to a Kafka cluster to consume and produce messages. You also can configure Transformation Extender Launcher watches to detect the arrival of new messages on Kafka topics and trigger maps to process those messages.
System requirements
See the release notes for the Kafka adapter requirements.
Command overview
In maps:
- Use adapter commands on input cards and GET functions to configure the adapter to consume messages from Kafka topics.
- Use adapter commands on output cards and PUT functions to configure the adapter to publish messages to Kafka topics.
Function | Adapter Command |
---|---|
Identify the Kafka cluster that the adapter connects to | -SRV |
Identify the topic to consume from or produce to | -TP -HDR |
Identify a consumer group or requesting application | -GID -CID |
Tune message delivery | -CT -BM -BS -LM -FMIB -FMAB -QTY -LSN |
Configure message delivery verification | -ACK -RET -EI -TID -IL |
Specify security protocol | -SP -SM -LCFL -KCFL -TSL -TSP -KSL -KSP -KP |
Control logical message size | -LMM -LMC -LMS |
Specify message offset within a partition | -OCS -AOR |
Override Kafka configuration options at run time | -APF -AP |
Process Launcher events synchronously | -SYNC |
Control error handling and logging | -ETP -T |
Many Kafka adapter commands are based on Apache Kafka consumer or producer configurations. Those adapter command descriptions include the name of the related Apache Kafka configuration. See the Apache Kafka documentation for details.
Apache Kafka adapter command aliases
Use KAFKA as the adapter command alias on input and output cards and in
GET and PUT rules. For example:
Input source override execution command | -IAKAFKA card_num |
Output target override execution command | -OAKAFKA card_num |
Headers for data payloads
You can specify the optional -HDR command to prepend one of the following header versions to the
messages. The numbers in brackets denote the number of bytes:
- [4] is 32-bit int
- [8] is 64-bit int
- Version
- Header structure
- 1
[4] Total header size (excluding this field): 2*4 + size(key) [4] Header Version: 1 [4]key Key size followed by the key value
- 2
[4] Total header size (excluding this field): 4*4 + 2*8 + size(key) + size(topic) [4] Header Version: 2 [4]key Key size followed by the key value [4]topic Topic size followed by the topic [4] Partition number [8] Offset [8] Timestamp
- 3
[4] Total header size (excluding this field): 4*4 + 2*8 + size(key) + size(topic) + sum(2*4 + size(hkey_i) + size(hvalue_i)) [4] Header Version: 3 [4]key Key size followed by the key value [4]topic Topic size followed by the topic [4] Partition number [8] Offset [8] Timestamp [4] The size of the Kafka headers section (excluding this field), followed by an array of Kafka header key, value pairs in the following format: [4]hkey Kafka header key size followed by Kafka header key [4]hvalue Kafka header value size followed by Kafka header value
The message header is always followed by the payload size and payload content:
[4]payload Payload size followed by payload content