Operator RabbitMQSink

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.ibm.streamsx.rabbitmq/op$com.ibm.streamsx.rabbitmq$RabbitMQSink.svg

This operator acts as a RabbitMQ producer, sending messages to a RabbitMQ broker. The broker is assumed to be already configured and running. The incoming stream can have three attributes: message, routing_key, and messageHeader. The message is a required attribute. The exchange name, queue name, and routing key can be specified using parameters. If a specified exchange does not exist, it will be created as a non-durable exchange. All exchanges created by this operator are non-durable and auto-delete.This operator supports direct, fanout, and topic exchanges. It does not support header exchanges. Messages are non-persistent and sending will only be attempted once by default. This behavior can be modified using the deliveryMode and maxMessageSendRetries parameters.

Behavior in a Consistent Region This operator can participate in a consistent region. It cannot be the start of a consistent region.

AppConfig: The hierarchy of credentials goes: credentials from the appConfig beat out parameters (username and password). The valid key-value pairs in the appConfig are <userPropName>=<username> and <passwordPropName>=<password>, where <userPropName> and <passwordPropName> are specified by the corresponding parameters. This operator will only automatically recover with new credentials from the appConfig if automaticRecovery is set to true.

Summary

Ports
This operator has 1 input port and 0 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 28 parameters.

Optional: URI, appConfigName, automaticRecovery, deliveryMode, exchangeName, exchangeType, hostAndPort, keyStoreAlgorithm, keyStorePassword, keyStorePath, keyStoreType, maxMessageSendRetries, messageAttribute, messageSendRetryDelay, msgHeaderAttribute, password, passwordPropName, routingKeyAttribute, setNetworkRecoveryInterval, sslProtocol, trustStoreAlgorithm, trustStorePassword, trustStorePath, trustStoreType, useSSL, userPropName, username, virtualHost

Metrics
This operator reports 3 metrics.

Properties

Implementation
Java

Input Ports

Ports (0)
Properties

Parameters

This operator supports 28 parameters.

Optional: URI, appConfigName, automaticRecovery, deliveryMode, exchangeName, exchangeType, hostAndPort, keyStoreAlgorithm, keyStorePassword, keyStorePath, keyStoreType, maxMessageSendRetries, messageAttribute, messageSendRetryDelay, msgHeaderAttribute, password, passwordPropName, routingKeyAttribute, setNetworkRecoveryInterval, sslProtocol, trustStoreAlgorithm, trustStorePassword, trustStorePath, trustStoreType, useSSL, userPropName, username, virtualHost

URI

Convenience URI of form: amqp://userName:password@hostName:portNumber/virtualHost. If URI is specified, you cannot specify username, password, and host.

Properties
appConfigName

This parameter specifies the name of application configuration that stores client credentials, the property specified via application configuration is overridden by the application parameters. The hierarchy of credentials goes: credentials from the appConfig beat out parameters (username and password). The valid key-value pairs in the appConfig are <userPropName>=<username> and <passwordPropName>=<password>, where <userPropName> and <passwordPropName> are specified by the corresponding parameters. If the operator loses connection to the RabbitMQ server, or it fails authentication, it will check for new credentials in the appConfig and attempt to reconnect if they exist. The attempted reconnection will only take place if automaticRecovery is set to true (which it is by default).

Properties
automaticRecovery

Have connections to RabbitMQ automatically recovered. Default is true.

Properties
deliveryMode

Marks message as persistent(2) or non-persistent(1). Default as 1.

Properties
exchangeName

Name of the RabbitMQ exchange to send messages to. To use default RabbitMQ exchange, use empty quotes or do not specify: "".

Properties
exchangeType

Optional attribute. Name of the RabbitMQ exchange type. Default direct.

Properties
hostAndPort

List of host and port in form: "myhost1:3456","myhost2:3456".

Properties
keyStoreAlgorithm

Specifies the algorithm that was used to encrypt the keyStore. If not specified, the operator will use the JVM's default algorithm (typically IbmX509).

Properties
keyStorePassword

Specifies the password used to unlock the keyStore.

Properties
keyStorePath

Specifies the path to the keyStore file. This parameter is required if the useSSL parameter is set to true.

Properties
keyStoreType

Specifies the keyStore type. If not specified, the operator will use the JVM's default type (typically JKS).

Properties
maxMessageSendRetries

This optional parameter specifies the number of successive retries that are attempted for a message if a failure occurs when the message is sent. The default value is zero; no retries are attempted.

Properties
messageAttribute

Name of the attribute for the message. Default is "message".

Properties
messageSendRetryDelay

This optional parameter specifies the time in milliseconds to wait before the next delivery attempt. If the maxMessageSendRetries is specified, you must also specify a value for this parameter.

Properties
msgHeaderAttribute

Name of the attribute for the message_header. Schema of type must be Map<ustring,ustring>. Default is "message_header".

Properties
password

Password for RabbitMQ authentication.

Properties
passwordPropName

This parameter specifies the property name of password in the application configuration. If the appConfigName parameter is specified and the passwordPropName parameter is not set, a compile time error occurs.

Properties
routingKeyAttribute

Name of the attribute for the routing_key. Default is "routing_key".

Properties
setNetworkRecoveryInterval

If automaticRecovery is set to true, this is the interval (in ms) that will be used between reconnection attempts. The default is 5000 ms.

Properties
sslProtocol

Specifies the SSL protocol to use. If not specified, the default value is "TLSv1.2".

Properties
trustStoreAlgorithm

Specifies the algorithm that was used to encrypt the trustStore. If not specified, the operator will use the JVM's default algorithm (typically IbmX509).

Properties
trustStorePassword

Specifies the password used to unlock the trustStore.

Properties
trustStorePath

Specifies the path to the trustStore file. This parameter is required if the useSSL parameter is set to true.

Properties
trustStoreType

Specifies the trustStore type. If not specified, the operator will use the JVM's default type (typically JKS).

Properties
useSSL

Specifies whether an SSL connection should be created. If not specified, the default value is false.

Properties
userPropName

This parameter specifies the property name of user name in the application configuration. If the appConfigName parameter is specified and the userPropName parameter is not set, a compile time error occurs.

Properties
username

Username for RabbitMQ authentication.

Properties
virtualHost

Set Virtual Host. Default is null.

Properties

Metrics

isConnected - Gauge

Describes whether we are currently connected to the RabbitMQ server.

reconnectionAttempts - Counter

The accumulated number of times we have attempted to reconnect since the operator has been started.

reconnectionAttemptsLatestBatch - Counter

The number of times we have attempted to reconnect to establish the last successful connection.

Libraries

Operator class library
Library Path: ../../impl/lib/com.ibm.streamsx.rabbitmq.jar, ../../opt/downloaded/*