Operator MQTTSink

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.ibm.streamsx.mqtt/op$com.ibm.streamsx.mqtt$MQTTSink.svg

The MQTTSink operator creates a message for every tuple it receives on its input port and publishes the message to an MQTT server. You can use the topic parameter to specify the topic that you want to publish to, or you can use an attribute from the input tuple to identify the topic that you want to publish to at run time. The input tuple can optionally contain the topic and QoS for the message. This information is not considered to be part of the data.

Behavior in a consistent region

MQTTSink operator can be an operator within the reachability graph of a consistent region, but it can not be placed at start of a consistent region. Having a control port in a consistent region is not supported. The control information may not be replayed, persisted and restored correctly. You may need to manually replay the control signals to bring the operator back to a consistent state.

When the MQTTSink operator is in a consistent region, messages with qos=1 or qos=2 will be delivered to an MQTT provider at least once. Messages with qos=0 can still be lost as a result of application failures.

Summary

Ports
This operator has 1 or more input ports and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 24 parameters.

Optional: appConfigName, clientID, commandTimeout, connection, connectionDocument, dataAttributeName, keepAliveInterval, keyStore, keyStorePassword, password, passwordPropName, period, qos, qosAttributeName, reconnectionBound, retain, serverURI, sslProtocol, topic, topicAttributeName, trustStore, trustStorePassword, userID, userPropName

Metrics
This operator reports 2 metrics.

Properties

Implementation
Java

Input Ports

Ports (0)

Input port 0 is a data port and is mandatory.

Properties
Ports (1...)

Input port 1 is an optional control port that can be used to update the configuration of the operator at run time.

Properties

Output Ports

Assignments
Java operators do not support output assignments.
Ports (0)

This port is an error port where a single tuple is sent for each failed message. The tuple contains a single attribute of type rstring, which contains the details of the error message.

Properties

Parameters

This operator supports 24 parameters.

Optional: appConfigName, clientID, commandTimeout, connection, connectionDocument, dataAttributeName, keepAliveInterval, keyStore, keyStorePassword, password, passwordPropName, period, qos, qosAttributeName, reconnectionBound, retain, serverURI, sslProtocol, topic, topicAttributeName, trustStore, trustStorePassword, userID, userPropName

appConfigName

This parameter specifies the name of application configuration that stores client credential information, the credential specified via application configruation overrides the one specified with userID and password parameters.

Properties
clientID

All clients connected to the same server must have a unique ID. This optional parameter allows user to specify a client id to use when connecting to a MQTT provider. An ID will be generated by the operator if this parameter is not specified.

Properties
commandTimeout

This optional parameter is used to specify maximum time in millisecond to wait for an MQTT action to complete. A MQTT action can include connecting to a server, or publshing to a message. A value of 0 will cause the operator to wait indefinitely for an action to complete. A negative number will cause a runtime error. If unspecified, the default value for this parameter is 0.

Properties
connection

Name of the connection specification of the MQTT element in the connection document.

Properties
connectionDocument

Path to connection document. If unspecified, default to applicationDir/etc/connections.xml. If a relative path is specified, the path is relative to the application directory.

Properties
dataAttributeName

This optional parameter specifies the name of the attribute that is used to hold actual content of message, if not specified, in the case where multiple attributes are defined for the streams schema, the operator will look for attribute named data and use it as data attribute. In the case where the schema contains only a signle attribute, the operator will assume that the attribute is the data attribute

Properties
keepAliveInterval

This optional parameter, measured in seconds, sets the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available. By default, it is set to 60 seconds. A value of 0 will disable it. Negative number will cause a runtime error.

Properties
keyStore

This optional parameter of type rstring specifies the file that contains the public and private key certificates of the MQTT client. If a relative path is specified, the path is relative to the application directory.

Properties
keyStorePassword

This optional parameter of type rstring specifies the password to decrypt the encrypted keyStore file.

Properties
password

This optional parameter sets the password to use for the connection. Must be specified when userID parameter is used, or compile time error will occur

Properties
passwordPropName

This parameter specifies the property name of password in the application configuration. If the appConfigName parameter is specified and the passwordPropName parameter is not set, a compile time error occurs.

Properties
period

This parameter specifies the time period in milliseconds the operator waits before it tries to reconnect. It is an optional parameter of type int64. Default value is 60000 ms.

Properties
qos

This optional parameter of type int32 specifies the quality of service that the MQTTSink operator provides for each MQTT message that it publishes to an MQTT topic. The valid values are 0, 1, and 2. The default value is 0.

Important: The quality of service is provided by the MQTT server to its subscribers. For each message that it publishes, the MQTTSink operator passes the value of the qos parameter as a part of the message header to the MQTT server.

If the qos parameter is set to 0, there is no guarantee that the message is received by the MQTT server or is handled by any of the message subscribers. The operator publishes the message at most once. No further attempts are made to publish the message again, and the message is lost in case of failures. If the qos value is set to 1 or 2, the operator publishes the message and waits until it receives an acknowledgment from the MQTT server before it discards the message. However, if the MQTTSink operator terminates unexpectedly while it is processing the input tuple or creating a message, or if there is a connection failure, the message is lost. There is no guarantee that the message is received by the MQTT server. If the MQTTSink operator publishes the topic successfully, the MQTT server ensures that the quality of service that is defined by the qos parameter is provided to the message subscribers.

Properties
qosAttributeName

Attribute name that contains the qos to publish the message with. This parameter is mutually exclusive with the "qos" parameter.

Properties
reconnectionBound

This optional parameter of type int32 specifies the number of successive connections that are attempted for an operator. Specify 0 for no retry, n for n number of retries, -1 for inifinite retry.

Properties
retain

Indicates if messages should be retained on the MQTT server. Default is false.

Properties
serverURI

This optional parameter of type rstring specifies the URI of the MQTT server to connect to. The serverURI has the following format: protocol://hostname or IP address:portnumber. The supported protocols are SSL and TCP. To use SSL authentication, set the protocol to ssl.

Properties
sslProtocol

This optional parameter of type rstring specifies the ssl protocol to use for making SSL connections. If this parameter is not specified, the default protocol TLSv1.2 will be used.

Properties
topic

This mandatory parameter of type rstring specifies the MQTT topic that you want to publish to. You can specify a static string, such as "traffic/freeway/880". This parameter is mutually exclusive with the "topicAttributeName" parameter.

Properties
topicAttributeName

Attribute name that contains the topic to publish the message with. This parameter is mutually exclusive with the "topic" parameter.

Properties
trustStore

This optional parameter of type rstring specifies the name of the file that contains the public certificate of the trusted MQTT server. If a relative path is specified, the path is relative to the application directory.

Properties
trustStorePassword

This optional parameter of type rstring specifies the password to decrypt the encrypted trustStore file.

Properties
userID

This optional parameter sets the user name to use for the connection. Must be specified when password parameter is used, or compile time error will occur

Properties
userPropName

This parameter specifies the property name of user name in the application configuration. If the appConfigName parameter is specified and the userPropName parameter is not set, a compile time error occurs.

Properties

Metrics

isConnected - Gauge

Indicates if operator currently connected to MQTT server, a value of 1 indicates it is connected and a value of 0 indicates it is not connected.

nConnectionLost - Counter

The number of lost connections to current MQTT server.

Libraries

Operator class library
Library Path: ../../impl/lib/com.ibm.streamsx.mqtt.jar, ../../opt/downloaded/*