Operator KafkaConsumer

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.ibm.streamsx.messaging/op$com.ibm.streamsx.messaging.kafka$KafkaConsumer.svg

DEPRECATED: The com.ibm.streamsx.messaging.kafka.KafkaConsumer operator is deprecated and is replaced by the com.ibm.streamsx.kafka.KafkaConsumer operator in the com.ibm.streamsx.kafka toolkit. The deprecated operator might be removed in a future release.

This operator acts as a Kafka consumer receiving messages for one or more topics. Ordering of messages is only guaranteed per Kafka topic partition. Specify properties as described here: http://kafka.apache.org/documentation.html#configuration. If you are using Java security modules for login/authentication, ensure that they are compatible with IBM Java, as IBM Streams only runs with IBM Java. The SPL Attribute Types supported are rstring, ustring, and blob. The topic must be of type rstring/ustring, while the key and message must be of the same type (rstring, ustring, or blob).

Kafka 0.9 Server Support: By default this toolkit builds with Kafka 0.10 client JARs. The Kafka 0.10 client is not compatible with Kafka 0.9 brokers. To use this operator with Kafka 0.9 brokers, you must rebuild with the kafka-0.9 target after cleaning. From the toolkit root directory:

ant clean

ant kafka-0.9

AppConfig: You must provide properties for the operator using at least one of the following parameters: kafkaProperty, propertiesFile, or appConfigName. The hierarchy of properties goes: properties from appConfig beat out kafkaProperty parameter properties, which beat out properties from the propertiesFile. The threadsPerTopic parameter has been removed since the upgrade to Kafka 0.9. This is because the new KafkaConsumer is single-threaded. Due to a bug in Kafka (eventually getting resolved by KAFKA-1894), when authentication failure occurs or connection to Kafka brokers is lost, we will not be able to pick up new properties from the PropertyProvider. The workaround is to manually restart the KafkaConsumer PE after properties have been updated. New properties will then be picked up.

Behavior in a Consistent Region This operator can be used inside a consistent region. Operator driven and periodical checkpointing are supported. Partitions to be read from must be specified. Resetting to initial state is not supported because the intial offset cannot be saved and may not be present in the Kafka log. In the case of a reset to initial state after operator crash, messages will start being read from the time of reset.

Summary

Ports
This operator has 0 input ports and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 12 parameters.

Required: topic

Optional: appConfigName, appConfigPropertyName, consumerPollTimeout, jaasFile, jaasFilePropName, kafkaProperty, keyAttribute, messageAttribute, partition, propertiesFile, triggerCount

Metrics
This operator does not report any metrics.

Properties

Implementation
Java

Output Ports

Assignments
Java operators do not support output assignments.
Ports (0)

Messages received from Kafka are sent on this output port.

Properties

Parameters

This operator supports 12 parameters.

Required: topic

Optional: appConfigName, appConfigPropertyName, consumerPollTimeout, jaasFile, jaasFilePropName, kafkaProperty, keyAttribute, messageAttribute, partition, propertiesFile, triggerCount

appConfigName

This parameter specifies the name of application configuration that stores client properties, the property specified via application configuration is overridden by the properties file and kafkaProperty parameter. The hierarchy of properties goes: properties from appConfig beat out kafkaProperty parameter properties, which beat out properties from the propertiesFile.

Properties
appConfigPropertyName

List of Kafka properties to retrieve from application configuration. The property name in the application configuration must the same as the Kafka property name. You may also supply jaasFile as a property name to act as the jaasFile parameter value.

Properties
consumerPollTimeout

The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns immediately with any records that are available now. Must not be negative. Default is 100.

Properties
jaasFile

Location of the jaas file to be used for SASL connections. Jaas file is recommended to be stored in the etc directory. If a relative path is specified, the path is relative to the application directory.This sets the system property java.security.auth.login.config. This can also be set using the appConfig by specifying jaasFile=<jaas.conf location>.

Properties
jaasFilePropName

This parameter specifies the property name of the jaasFile location in the application configuration. The default name is jaasFile.

Properties
kafkaProperty

Specify a Kafka property "key=value" form. This will override any property specified in the properties file. The hierarchy of properties goes: properties from appConfig beat out kafkaProperty parameter properties, which beat out properties from the propertiesFile.

Properties
keyAttribute

Name of the attribute for the key. Default is "key".

Properties
messageAttribute

Name of the attribute for the message. If this parameter is not specified, then by default the operator will look for an attribute named "message". If the "message" attribute is not found, a runtime error will be returned.

Properties
partition

Partition to be subscribed to. 1 or more can be provided using comma separation. You may only specify 1 topic if you are specifying partitions. Ex: 0,2,3

Properties
propertiesFile

Properties file containing kafka properties. Properties file is recommended to be stored in the etc directory. If a relative path is specified, the path is relative to the application directory. The hierarchy of properties goes: properties from appConfig beat out kafkaProperty parameter properties, which beat out properties from the propertiesFile.

Properties
topic

Topic to be subscribed to. 1 or more can be provided using comma separation. Ex: "mytopic1","mytopic2"

Properties
triggerCount

Approximate number of messages between checkpointing for consistent region. This is only relevant to operator driven checkpointing. Checkpointing is done after a buffer of messages is submitted, so actual triggerCount at checkpoint time may be slightly above specified triggerCount.

Properties

Libraries

Operator class library
Library Path: ../../impl/lib/com.ibm.streamsx.messaging.jar, ../../opt/downloaded/*, ../../opt/*