Adapter commands for consumers
Kafka adapter commands for consumers are valid for input data sources. For additional details, see the consumer configuration information in the Apache Kafka documentation.
Group ID
Specifies the unique string that identifies the consumer group that a consumer belongs to. This property enables Kafka group management, which is used when the -TP command specifies only topic names without topic partitions. The default is a null string. The corresponding adapter command is -GID groupID (or -GROUPID groupID).
Related Kafka consumer configuration: group.id
Offset Commit Strategy
Specifies the policy that the adapter uses to commit an offset when it consumes a message. The corresponding adapter command is -OCS (or -OFFSETCOMMITSTRATEGY).
- Manual: The Kafka adapter commits the offset when it commits the transaction, as defined by the setting on the input card of the map. This is the default.
- Auto: The Kafka cluster periodically commits the offset.
- Never: Neither the Kafka cluster nor the Kafka adapter commits the offset.
Offset Commit Retry Count
Specifies the number of times to ignore offset commit failures and proceed consuming messages. The corresponding adapter command is -OCRC (-OFFSETCOMMITRETRYCOUNT).
When the offset commit operation fails and the specified count is not reached, the adapter increments the internal counter of failed commits and proceeds to consume messages as if the commit was successful. If the error topic was specified and the value for this command is different from 0, the adapter sends the error message with the topic, partition, and offset information of the failed commit operation to the error topic. The adapter retries to commit the pending offsets again the next time it is scheduled to performs the commit operation. The default value for this command is 0, which instructs the adapter to treat offset commit failures as fatal severity errors and not attempt retries. The special value S is supported and is used to specify unlimited number of retries.
Skip Duplicates
Instructs the adapter to keep track of all messages that it consumed in the current process, and to ignore messages that it has previously consumed. The corresponding adapter command is -SD (-SKIPDUPLICATES).
q
If the error topic is configured, the adapter sends error messages indicating detected duplicates messages (their topic, partition and offset) to the error topic.
-OFFSETCOMMITTRETRYCONSTRAINT value [value] -OCC value [value]
Specifies constraints the adapter must meet when committing pending offsets. One of the following two values can be specified, or both can be specified, separated by a space:
apo: Stands for Assigned Partitions Only and means that the adapter consumer should commit only those pending offsets that belong to the partitions currently assigned to it. The offsets that belong to a partition that the consumer owned at some earlier point but that has subsequently been assigned to another consumer are skipped.
ssc: Stands for Skip Stale Commits and means that the adapter should not commit pending offsets on a partition that are older than the currently committed offsets on the same partition, for the same consumer group. Such old offsets are considered stale and are possible if the consumer loses a partition due to partition rebalancing, and another consumer in the same consumer group is assigned the partition and consumes and commits offsets past the last offset consumer by the original consumer. If the original consumer later regains ownership of the partition, the offsets that it still considers pending for that partition will be stale. If this constraint is specified and the consumer detects stale offsets, it does not commit them and it discards (forgets) them.
-LOGERRORMESSAGES (-LEM)
Specifies that Kafka error messages published to the error topic should also be written to the adapter trace file.
Error Message Format Version
Specifies the format of messages sent to the error topic. The corresponding adapter command is -EMFV version (or -ERRORMESSAGEFORMATVERSION version).
- 1: The default version, each message is in format t:p-o where t, p and o are the topic, partition and offset of the consumed message for which the processing failed and for which the error record is produced to the error topic
- 2: Each message is in JSON format: {"topic":"t","partition":"p","offset":"o","code":"c"} where t, p and o are the topic, partition and offset values of the consumed message for which the processing failed, and c is the error code indicating the reason for the failure, and is one of the following:
- DUPLICATE: Indicates the messages was skipped because it was already consumed earlier for the same consumer group in the current process, and -SKIPDUPLICATES (-SD) adapter command was specified COMMIT_FAILED - indicates the messages was processed successfully, but the offset commit operation for it failed ROLLBACK - indicates that an error was reported while processing the specified message, for example when performing database write operation in one of the map outputs in the same transaction scope in which the message was consumed.
Fetch Minimum Bytes
Specifies the minimum amount of data, in bytes, that must accumulate before the server returns the data to the consumer. The default data size is one byte, which means the server responds to a data request as soon as any data is available to return. The corresponding adapter command is -FMIB size (or - FETCHMINBYTES size).
Related Kafka consumer configuration: fetch.min.bytes
Fetch Maximum Bytes
Specifies the maximum amount of data, in bytes, that the server should return for a fetch request. The corresponding adapter command is -FMAB size (or - FETCHMAXBYTES size).
Related Kafka consumer configuration: fetch.max.bytes
Auto Offset Reset
The offset position to automatically use when no previous offset exists. The corresponding adapter command is -AOR (-AUTOOFFSETRESET).
- Latest: Starts consuming new messages from the most recent offset. This is the default.
- Earliest: Starts consuming from the oldest available record by automatically resetting the offset to the earliest offset.
- None: Reports an error.
Related Kafka consumer configuration: auto.offset.reset
Synchronized
Specifies that the Launcher listener thread is to operate synchronously. The listener waits to notify a map of an event (a message added to a topic) until the map acknowledges that it processed the previous event. The corresponding adapter command is -SYNC (or -SYNCHRONIZED).
If the map fails, the listener does not report subsequent events unless the -ETP command is specified. With the -ETP command, when the Launcher listener records the failed event on the error topic, the listener is unblocked and proceeds to report new events.
Error Topic
- The Command Server records the error at the time of transaction rollback. In a map, the input card setting controls when a transaction failure is processed by the adapter.
- The Launcher records the error when the listener thread is blocked because it's waiting for the status of the event processing, as specified by the -SYNC command.
After recording the error, the listener resumes processing new events.
Isolation Level
- read_committed: Only messages from transactions that are committed (and messages that were not part of a transaction) are visible to consumers.
- read_uncommitted: All messages are visible to consumers, even if they were part of an aborted transaction. This is the default.
Related Kafka consumer configuration: isolation.level
Quantity
Specifies the number of messages to consume, or S to consume all available messages. The corresponding adapter command is -QTY (or -QUANTITY).
The default value is 1. If you specify a value other than 1, you must set the FetchAs input card setting to Burst and the FetchUnit input card setting to 1 in the map.
Listen
- Seconds: The number of seconds that the consumer waits for messages to arrive.
- S: Wait for an unlimited time for messages to arrive. This is the default.
- 0: Consume all available messages and do not wait for new messages to arrive.
Logical Message Count
Specifies the logical message count. Specifies the number of Kafka messages to concatenate and return as a single logical message from the adapter. By default, the adapter returns each Kafka message as a separate logical message. To concatenate all available messages, specify 0. The corresponding adapter command is -LMC count (or -LOGICALMESSAGECOUNT count).
This command is valid only in logical message mode (-LMM command). Logical message mode is not valid in a Launcher scenario.
Logical Message Bytes
Logical message buffer-size limit, in bytes. The adapter buffers messages until it exceeds this limit, then returns all buffered messages as a single logical message. The corresponding adapter command is -LMS bytes (or -LOGICALMESSAGESIZE bytes)
This command is valid only in logical message mode (-LMM command). Logical message mode is not valid in a Launcher scenario.
Deserialize Failure Mode
Specifies deserialize failure mode. It determines the action to take when a deserializer registered with the adapter throws exception while deserializing Kafka messages before providing them to the consumer in the adapter. The corresponding adapter command is -DFM {fail | warn | ignore} (or -DESERIALIZEFAILUREMODE {fail | warn | ignore}).
- fail: The original deserialization exception is re-thrown. This is the default mode.
- warn: The deserialization error is logged (presuming trace command is provided) but the value is deserialized as zero-length byte array.
- ignore: Same as warn, except the error is not logged.
Empty Key Substitution value
Specifies literal string value the adapter will present to the map as the key of the message when it is retrieved from topic as null or empty array value. The corresponding adapter command is -EKS value (or -EMPTYKEYSUBSTITUTION value).
This includes null values obtained for messages which failed deserialization in the deserialized registered with the consumer, when -DFM warn or -DFM ignore command was specified. For example, if processing Avro messages from a topic with keys in JSON format, and some of them are invalid keys that do not comply with the Avro schema associated with them, then the combination of commands -DFM ignore -EKS {} will result in adapter ignoring deserialization errors, and returning nulls as the message keys. Subsequently it will replace those nulls with {} string literals prior to returning them from the consumer.
Empty Value Substitution value
Specifies literal string value the adapter will present to the map as the value of the message when it is retrieved from topic as null or empty array value. The corresponding adapter command is -EVS value (or -EMPTYVALUESUBSTITUTION value).
This include null values obtained for messages which failed deserialization in the deserialized registered with the consumer, when -DFM warn or -DFM ignore command was specified. For example, if processing Avro messages from a topic as JSON messages, and some of them are invalid messages that do not comply with the Avro schema associated with them, then the combination of commands -DFM ignore -EVS {} will result in adapter ignoring deserialization errors, and returning nulls as the message values. Subsequently it will replace those nulls with {} string literals prior to returning them from the consumer.
Avro JSON Conversion Mode
Specifies the mode for converting Avro records to JSON representation. The default mode is Simple (case-insensitive) and results in producing the default simple JSON. The other mode is Strict (case-insensitive) and results in producing JSON in compliance with the Avro schema. Avro schema must be specified when using Strict mode. When this command is omitted, the default value is Simple. In most cases, the two modes will produce equivalent JSON representation, but in some cases, such as when the Avro schema contains elements of union types, the JSON produced in Strict mode will be more verbose. That will make it less convenient for processing, but it will allow converting it back to Avro in compliance with the same Avro schema.
The corresponding adapter command is -AJCM mode (or -AVROJSONCONVERSIONMODE mode).
Avro JSON Encoder Class Name
Specifies the Java class to use for converting Avro to JSON representation. This command is optional and if not specified the default class org.apache.avro.io.JsonEncoder from the Apache Avro library is used internally by the adapter. When specified, the provided value must be a fully-qualified name of the Java class that implements a public constructor which takes two arguments: org.apache.avro.Schema which represents the Avro schema to use for the conversion, and java.io.OutputStream which represents the stream to which to write JSON content. The jar file that implements this class must be included in the classpath in use for the adapter.
The corresponding adapter command is -AJECN name (or -AVROJSONENCODERCLASSNAME name).