Operator RabbitMQSource

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.ibm.streamsx.rabbitmq/op$com.ibm.streamsx.rabbitmq$RabbitMQSource.svg

This operator acts as a RabbitMQ consumer, pulling messages from a RabbitMQ broker. The broker is assumed to be already configured and running. The outgoing stream can have three attributes: message, routing_key, and messageHeader. The message is a required attribute. The exchange name, queue name, and routing key can be specified using parameters. If a specified exchange does not exist, it will be created as a non-durable exchange. If a queue name is specified for a queue that already exists, all binding parameters (exchangeName and routing_key) will be ignored. Only queues created by this operator will result in exchange/routing key bindings. All exchanges and queues created by this operator are non-durable and auto-delete.This operator supports direct, fanout, and topic exchanges. It does not support header exchanges.

Behavior in a Consistent Region This operator cannot participate in a consistent region.

AppConfig: The hierarchy of credentials goes: credentials from the appConfig beat out parameters (username and password). The valid key-value pairs in the appConfig are <userPropName>=<username> and <passwordPropName>=<password>, where <userPropName> and <passwordPropName> are specified by the corresponding parameters. This operator will only automatically recover with new credentials from the appConfig if automaticRecovery is set to true.

Summary

Ports
This operator has 0 input ports and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 27 parameters.

Optional: URI, appConfigName, automaticRecovery, exchangeName, exchangeType, hostAndPort, keyStoreAlgorithm, keyStorePassword, keyStorePath, keyStoreType, messageAttribute, msgHeaderAttribute, password, passwordPropName, queueName, routingKey, routingKeyAttribute, setNetworkRecoveryInterval, sslProtocol, trustStoreAlgorithm, trustStorePassword, trustStorePath, trustStoreType, useSSL, userPropName, username, virtualHost

Metrics
This operator reports 3 metrics.

Properties

Implementation
Java

Output Ports

Assignments
Java operators do not support output assignments.
Ports (0)

Messages received are sent on this output port.

Properties

Parameters

This operator supports 27 parameters.

Optional: URI, appConfigName, automaticRecovery, exchangeName, exchangeType, hostAndPort, keyStoreAlgorithm, keyStorePassword, keyStorePath, keyStoreType, messageAttribute, msgHeaderAttribute, password, passwordPropName, queueName, routingKey, routingKeyAttribute, setNetworkRecoveryInterval, sslProtocol, trustStoreAlgorithm, trustStorePassword, trustStorePath, trustStoreType, useSSL, userPropName, username, virtualHost

URI

Convenience URI of form: amqp://userName:password@hostName:portNumber/virtualHost. If URI is specified, you cannot specify username, password, and host.

Properties
appConfigName

This parameter specifies the name of application configuration that stores client credentials, the property specified via application configuration is overridden by the application parameters. The hierarchy of credentials goes: credentials from the appConfig beat out parameters (username and password). The valid key-value pairs in the appConfig are <userPropName>=<username> and <passwordPropName>=<password>, where <userPropName> and <passwordPropName> are specified by the corresponding parameters. If the operator loses connection to the RabbitMQ server, or it fails authentication, it will check for new credentials in the appConfig and attempt to reconnect if they exist. The attempted reconnection will only take place if automaticRecovery is set to true (which it is by default).

Properties
automaticRecovery

Have connections to RabbitMQ automatically recovered. Default is true.

Properties
exchangeName

Name of the RabbitMQ exchange to bind the queue to. If consuming from an already existing queue, this parameter is ignored. To use default RabbitMQ exchange, do not specify this parameter or use empty quotes: "".

Properties
exchangeType

Optional attribute. Name of the RabbitMQ exchange type. Default direct.

Properties
hostAndPort

List of host and port in form: "myhost1:3456","myhost2:3456".

Properties
keyStoreAlgorithm

Specifies the algorithm that was used to encrypt the keyStore. If not specified, the operator will use the JVM's default algorithm (typically IbmX509).

Properties
keyStorePassword

Specifies the password used to unlock the keyStore.

Properties
keyStorePath

Specifies the path to the keyStore file. This parameter is required if the useSSL parameter is set to true.

Properties
keyStoreType

Specifies the keyStore type. If not specified, the operator will use the JVM's default type (typically JKS).

Properties
messageAttribute

Name of the attribute for the message. Default is "message".

Properties
msgHeaderAttribute

Name of the attribute for the message_header. Schema of type must be Map<ustring,ustring>. Default is "message_header".

Properties
password

Password for RabbitMQ authentication.

Properties
passwordPropName

This parameter specifies the property name of password in the application configuration. If the appConfigName parameter is specified and the passwordPropName parameter is not set, a compile time error occurs.

Properties
queueName

Name of the queue. Main reason to specify is to facilitate parallel consuming. If this parameter is not specified, a queue will be created using a randomly generated name.

Properties
routingKey

Routing key/keys to bind the queue to. If you are connecting to an existing queue, these bindings will be ignored.

Properties
routingKeyAttribute

Name of the attribute for the routing_key. Default is "routing_key".

Properties
setNetworkRecoveryInterval

If automaticRecovery is set to true, this is the interval (in ms) that will be used between reconnection attempts. The default is 5000 ms.

Properties
sslProtocol

Specifies the SSL protocol to use. If not specified, the default value is "TLSv1.2".

Properties
trustStoreAlgorithm

Specifies the algorithm that was used to encrypt the trustStore. If not specified, the operator will use the JVM's default algorithm (typically IbmX509).

Properties
trustStorePassword

Specifies the password used to unlock the trustStore.

Properties
trustStorePath

Specifies the path to the trustStore file. This parameter is required if the useSSL parameter is set to true.

Properties
trustStoreType

Specifies the trustStore type. If not specified, the operator will use the JVM's default type (typically JKS).

Properties
useSSL

Specifies whether an SSL connection should be created. If not specified, the default value is false.

Properties
userPropName

This parameter specifies the property name of user name in the application configuration. If the appConfigName parameter is specified and the userPropName parameter is not set, a compile time error occurs.

Properties
username

Username for RabbitMQ authentication.

Properties
virtualHost

Set Virtual Host. Default is null.

Properties

Metrics

isConnected - Gauge

Describes whether we are currently connected to the RabbitMQ server.

reconnectionAttempts - Counter

The accumulated number of times we have attempted to reconnect since the operator has been started.

reconnectionAttemptsLatestBatch - Counter

The number of times we have attempted to reconnect to establish the last successful connection.

Libraries

Operator class library
Library Path: ../../impl/lib/com.ibm.streamsx.rabbitmq.jar, ../../opt/downloaded/*