Operator KafkaConsumer
Specialized toolkits - release 4.3.1.0-prod20190605 > com.ibm.streamsx.messaging 5.4.1 > com.ibm.streamsx.messaging.kafka > KafkaConsumer
DEPRECATED: The com.ibm.streamsx.messaging.kafka.KafkaConsumer operator is deprecated and is replaced by the com.ibm.streamsx.kafka.KafkaConsumer operator in the com.ibm.streamsx.kafka toolkit. The deprecated operator might be removed in a future release.
This operator acts as a Kafka consumer receiving messages for one or more topics. Ordering of messages is only guaranteed per Kafka topic partition. Specify properties as described here: http://kafka.apache.org/documentation.html#configuration. If you are using Java security modules for login/authentication, ensure that they are compatible with IBM Java, as IBM Streams only runs with IBM Java. The SPL Attribute Types supported are rstring, ustring, and blob. The topic must be of type rstring/ustring, while the key and message must be of the same type (rstring, ustring, or blob).
Kafka 0.9 Server Support: By default this toolkit builds with Kafka 0.10 client JARs. The Kafka 0.10 client is not compatible with Kafka 0.9 brokers. To use this operator with Kafka 0.9 brokers, you must rebuild with the kafka-0.9 target after cleaning. From the toolkit root directory:
ant clean
ant kafka-0.9
AppConfig: You must provide properties for the operator using at least one of the following parameters: kafkaProperty, propertiesFile, or appConfigName. The hierarchy of properties goes: properties from appConfig beat out kafkaProperty parameter properties, which beat out properties from the propertiesFile. The threadsPerTopic parameter has been removed since the upgrade to Kafka 0.9. This is because the new KafkaConsumer is single-threaded. Due to a bug in Kafka (eventually getting resolved by KAFKA-1894), when authentication failure occurs or connection to Kafka brokers is lost, we will not be able to pick up new properties from the PropertyProvider. The workaround is to manually restart the KafkaConsumer PE after properties have been updated. New properties will then be picked up.
Behavior in a Consistent Region This operator can be used inside a consistent region. Operator driven and periodical checkpointing are supported. Partitions to be read from must be specified. Resetting to initial state is not supported because the intial offset cannot be saved and may not be present in the Kafka log. In the case of a reset to initial state after operator crash, messages will start being read from the time of reset.
Summary
- Ports
- This operator has 0 input ports and 1 output port.
- Windowing
- This operator does not accept any windowing configurations.
- Parameters
- This operator supports 12 parameters.
Required: topic
Optional: appConfigName, appConfigPropertyName, consumerPollTimeout, jaasFile, jaasFilePropName, kafkaProperty, keyAttribute, messageAttribute, partition, propertiesFile, triggerCount
- Metrics
- This operator does not report any metrics.
Properties
- Implementation
- Java
- Assignments
- Java operators do not support output assignments.
- Ports (0)
-
Messages received from Kafka are sent on this output port.
- Properties
-
- Optional: false
Required: topic
Optional: appConfigName, appConfigPropertyName, consumerPollTimeout, jaasFile, jaasFilePropName, kafkaProperty, keyAttribute, messageAttribute, partition, propertiesFile, triggerCount
- appConfigName
-
This parameter specifies the name of application configuration that stores client properties, the property specified via application configuration is overridden by the properties file and kafkaProperty parameter. The hierarchy of properties goes: properties from appConfig beat out kafkaProperty parameter properties, which beat out properties from the propertiesFile.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- appConfigPropertyName
-
List of Kafka properties to retrieve from application configuration. The property name in the application configuration must the same as the Kafka property name. You may also supply jaasFile as a property name to act as the jaasFile parameter value.
- consumerPollTimeout
-
The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns immediately with any records that are available now. Must not be negative. Default is 100.
- Properties
-
- Type: int32
- Cardinality: 1
- Optional: true
- jaasFile
-
Location of the jaas file to be used for SASL connections. Jaas file is recommended to be stored in the etc directory. If a relative path is specified, the path is relative to the application directory.This sets the system property java.security.auth.login.config. This can also be set using the appConfig by specifying jaasFile=<jaas.conf location>.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- jaasFilePropName
-
This parameter specifies the property name of the jaasFile location in the application configuration. The default name is jaasFile.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- kafkaProperty
-
Specify a Kafka property "key=value" form. This will override any property specified in the properties file. The hierarchy of properties goes: properties from appConfig beat out kafkaProperty parameter properties, which beat out properties from the propertiesFile.
- keyAttribute
-
Name of the attribute for the key. Default is "key".
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- messageAttribute
-
Name of the attribute for the message. If this parameter is not specified, then by default the operator will look for an attribute named "message". If the "message" attribute is not found, a runtime error will be returned.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- partition
-
Partition to be subscribed to. 1 or more can be provided using comma separation. You may only specify 1 topic if you are specifying partitions. Ex: 0,2,3
- propertiesFile
-
Properties file containing kafka properties. Properties file is recommended to be stored in the etc directory. If a relative path is specified, the path is relative to the application directory. The hierarchy of properties goes: properties from appConfig beat out kafkaProperty parameter properties, which beat out properties from the propertiesFile.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- topic
-
Topic to be subscribed to. 1 or more can be provided using comma separation. Ex: "mytopic1","mytopic2"
- triggerCount
-
Approximate number of messages between checkpointing for consistent region. This is only relevant to operator driven checkpointing. Checkpointing is done after a buffer of messages is submitted, so actual triggerCount at checkpoint time may be slightly above specified triggerCount.
- Properties
-
- Type: int32
- Cardinality: 1
- Optional: true
- Operator class library