Operator MQTTSource

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.ibm.streamsx.messaging/op$com.ibm.streamsx.messaging.mqtt$MQTTSource.svg

DEPRECATED: The com.ibm.streamsx.messaging.mqtt.MQTTSource operator is deprecated and is replaced by the com.ibm.streamsx.mqtt.MQTTSource operator in the com.ibm.streamsx.mqtt toolkit. The deprecated operator might be removed in a future release.

The MQTTSource operator subscribes to MQTT topics and receives messages when they are published to subscribed topics. You specify the list of topics that you want to subscribe to and the quality of service (QoS) for the topics when you connect to the MQTT server. You can update the list of topics and the QoS levels at run time by using the optional control input port.

Behavior in a consistent region

MQTTSource operator cannot be part of a consistent region. Connect MQTTSource operator to an ReplayableStart operator from the Standard Toolkit to achieve tuple replay. Messages are stored in an internal buffer before they are submitted to the output port. To reduce the chance of tuple loss during application failure, you may use the 'messageQueueSize' parameter to control this internal buffer size.

Summary

Ports
This operator has 0 or more input ports and 2 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 24 parameters.

Required: topics

Optional: appConfigName, clientID, commandTimeout, connection, connectionDocument, dataAttributeName, keepAliveInterval, keyStore, keyStorePassword, messageQueueSize, password, passwordPropName, period, qos, qosStr, reconnectionBound, serverURI, sslProtocol, topicOutAttrName, trustStore, trustStorePassword, userID, userPropName

Metrics
This operator reports 2 metrics.

Properties

Implementation
Java

Input Ports

Ports (0...)

This is the optional control port. You can use the control port to update information at run time, such as the connection information that the operator uses to connect to an MQTT server, the topics that the operator subscribes to, or the QoS levels of the subscribed topics.

Properties

Output Ports

Assignments
Java operators do not support output assignments.
Ports (0)

This is the data port and is mandatory.

Properties

Ports (1)

The optional output port is an error port where the operator submits a single tuple for each failed message. The tuple contains a single attribute of type rstring or ustring, which contains the details of the error message.

Properties

Parameters

This operator supports 24 parameters.

Required: topics

Optional: appConfigName, clientID, commandTimeout, connection, connectionDocument, dataAttributeName, keepAliveInterval, keyStore, keyStorePassword, messageQueueSize, password, passwordPropName, period, qos, qosStr, reconnectionBound, serverURI, sslProtocol, topicOutAttrName, trustStore, trustStorePassword, userID, userPropName

appConfigName

This parameter specifies the name of application configuration that stores client credential information, the credential specified via application configruation overrides the one specified with userID and password parameters.

Properties
clientID

All clients connected to the same server must have a unique ID. This optional parameter allows user to specify a client id to use when connecting to a MQTT provider. An ID will be generated by the operator if this parameter is not specified.

Properties
commandTimeout

This optional parameter is used to specify maximum time in millisecond to wait for an MQTT action to complete. A MQTT action can include connecting to a server, or publshing to a message. A value of 0 will cause the operator to wait indefinitely for an action to complete. A negative number will cause a runtime error. If unspecified, the default value for this parameter is 0.

Properties
connection

Name of the connection specification of the MQTT element in the connection document.

Properties
connectionDocument

Path to connection document. If unspecified, default to applicationDir/etc/connections.xml. If a relative path is specified, the path is relative to the application directory.

Properties
dataAttributeName

This optional parameter specifies the name of the attribute that is used to hold actual content of message, if not specified, in the case where multiple attributes are defined for the streams schema, the operator will look for attribute named data and use it as data attribute. In the case where the schema contains only a signle attribute, the operator will assume that the attribute is the data attribute

Properties
keepAliveInterval

This optional parameter, measured in seconds, sets the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available. By default, it is set to 60 seconds. A value of 0 will disable it. Negative number will cause a runtime error.

Properties
keyStore

This optional parameter of type rstring specifies the file that contains the public and private key certificates of the MQTT client. If a relative path is specified, the path is relative to the application directory.

Properties
keyStorePassword

This optional parameter of type rstring specifies the password to decrypt the encrypted keyStore file.

Properties
messageQueueSize

Specify size of internal buffer for queueing incoming tuples. The default buffer size is 50 tuples.

Properties
password

This optional parameter sets the password to use for the connection. Must be specified when userID parameter is used, or compile time error will occur

Properties
passwordPropName

This parameter specifies the property name of password in the application configuration. If the appConfigName parameter is specified and the passwordPropName parameter is not set, a compile time error occurs.

Properties
period

This parameter specifies the time period in milliseconds the operator waits before it tries to reconnect. It is an optional parameter of type int64. Default value is 60000 ms.

Properties
qos

List of qos for topic subscriptions, this attribute is mutually exclusive with qosStr attribute.

Properties
qosStr

List of qos in string format for topic subscriptions. Multiple comma separated qos value can be specified, for example "0, 1". This attribute is mutually exclusive with qos attribute.

Properties
reconnectionBound

This optional parameter of type int32 specifies the number of successive connections that are attempted for an operator. Specify 0 for no retry, n for n number of retries, -1 for inifinite retry.

Properties
serverURI

This optional parameter of type rstring specifies the URI of the MQTT server to connect to. The serverURI has the following format: protocol://hostname or IP address:portnumber. The supported protocols are SSL and TCP. To use SSL authentication, set the protocol to ssl.

Properties
sslProtocol

This optional parameter of type rstring specifies the ssl protocol to use for making SSL connections. If this parameter is not specified, the default protocol TLSv1.2 will be used.

Properties
topicOutAttrName

Output attribute on output data stream to assign message topic to.

Properties
topics

List of topics to subscribe to. Multiple comma separated topics can be specified, for example "topic1, topic2"

Properties
trustStore

This optional parameter of type rstring specifies the name of the file that contains the public certificate of the trusted MQTT server. If a relative path is specified, the path is relative to the application directory.

Properties
trustStorePassword

This optional parameter of type rstring specifies the password to decrypt the encrypted trustStore file.

Properties
userID

This optional parameter sets the user name to use for the connection. Must be specified when password parameter is used, or compile time error will occur

Properties
userPropName

This parameter specifies the property name of user name in the application configuration. If the appConfigName parameter is specified and the userPropName parameter is not set, a compile time error occurs.

Properties

Metrics

isConnected - Gauge

Indicates if operator currently connected to MQTT server, a value of 1 indicates it is connected and a value of 0 indicates it is not connected.

nConnectionLost - Counter

The number of lost connections to current MQTT server.

Libraries

Operator class library
Library Path: ../../impl/lib/com.ibm.streamsx.messaging.jar, ../../opt/downloaded/*