Operator XMSSink

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.ibm.streamsx.messaging/op$com.ibm.streamsx.messaging.xms$XMSSink.svg

The XMSSink operator takes messages from IBM InfoSphere Streams and can send them to a WebSphere MQ queue or topic.

The incoming tuple from InfoSphere Streams can be one or many of the following data types: int8, uint8, int16, uint16, int32, uint32, int64, float32, float64, boolean, blob, or rstring. The input tuple is serialized into a WebSphere MQ message either as a map, stream, bytes, xml, wbe, or wbe22 message, according to the value of the message_class attribute in the connection specifications document. An additional empty value can be specified in the message_class attribute, in which case the operator constructs an empty JMS or XMS message. This message class cannot be used with a native schema.

Behavior in a consistent region

The XMSSink operator supports consistent regions. This operator can be part of a consistent region but it cannot be at the start of a consistent region.

Exceptions

The following list describes the common types of exceptions that can occur:
  • Run time errors that halt the operator execution.
    • The XMSSink operator throws an exception and terminates in the following cases:
      • During the initial connect attempt, the reconnectionPolicy is set to NoRetry and the operator does not have a successful connection, or during transient connection failures the reconnectionPolicy is set to BoundedRetry and the operator does not have a successful connection after the number of attempts that are specified in the reconnectionBound parameter. Successive data is lost.
      • The queue name is unknown.
      • The user is not an authorized user of the queue.
      • The queue manager name is unknown.
      • The operator is unable to connect to the host.
      • The operator is unable to connect to the port.
  • Run time errors that cause the message to be dropped and an error message to be logged.
    • The XMSSink operator throws an exception and discards the message in the following circumstances:
      • The data that is being written is longer than the maximum message length specified in the queue in WebSphere MQ.
    • The discarded message is not sent to the WebSphere MQ queue or topic.
  • Compile time errors
    • The XMSSink operator throws a compile-time error in the following cases:
      • A native schema is specified and the message class is empty.
      • There is a mismatch between the data type of one or more attributes in the native schema and the data type of attributes in the input stream.
      • One or more native schema attributes do not have a matching attribute in the input stream schema.
      • The connections.xml file does not conform to connections.xsd.
      • The connectionsDocument parameter refers to a nonexistent connections file.
      • The connectionsDocument parameter is not specified and there is no connections.xml file inside the etc directory.
      • An invalid value is specified for the message class.
      • The access parameter does not refer to an existing access element in the native schema.
      • The connection parameter does not refer to an existing access element in the native schema.
      • A length is specified for a native schema attribute that does not have a string or blob data type.
      • A negative length is specified for a string or blob attribute in the native schema for a map, stream, xml, wbe, or wbe22 message class.
      • A negative length other than -2, -4, or -8 is specified for a string or blob attribute in the native schema for a bytes message class.

In all of these cases, the operator logs the exception in the console logs. The information about the exception is also sent to the optional error output port, unless it is a run time error that halts the operator or a compile time error.

Examples

Summary

Ports
This operator has 1 input port and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 6 parameters.

Required: access, connection

Optional: connectionDocument, period, reconnectionBound, reconnectionPolicy

Metrics
This operator reports 3 metrics.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Input Ports

Ports (0)

The XMSSink operator is configurable with a single input data port, which is required. The input port is non-mutating and its punctuation mode is Oblivious.

Properties

Output Ports

Assignments
This operator requires that assignments made to output attributes must evaluate at compile-time to a constant.
Ports (0)

The XMSSink operator is configurable with an optional port that submits a tuple when an error occurs. The optional port is mutating and its punctuation mode is Free.

The error output port contains an optional first attribute which contains the input tuple that caused the error and a second attribute of type rstring that details the error message. Only one error message is sent per failed tuple.

Properties

Parameters

This operator supports 6 parameters.

Required: access, connection

Optional: connectionDocument, period, reconnectionBound, reconnectionPolicy

access

This mandatory parameter identifies the access specification name.

Properties

connection

This mandatory parameter identifies the name of the connection specification that contains an XMS element.

Properties

connectionDocument

This optional parameter specifies the path name of the file that contains the connection and access specifications, which are identified by the connection and access parameters. If the parameter is specified, it must have exactly one value that is a string constant. If the parameter is not specified, the operator uses the file that is in the default location ../etc/connections.xml. If a relative path is specified, the path is relative to the root of the application directory.

Properties

period

This optional parameter specifies the time period in seconds the operator waits before it tries to reconnect. You can use this parameter only when the reconnectionPolicy parameter is specified, otherwise a compile time error occurs. The default value for the period parameter is 60.

Properties

reconnectionBound

This optional parameter specifies the number of successive connections that are attempted for an operator. You can use this parameter only when the reconnectionPolicy parameter is specified and set to BoundedRetry, otherwise a compile time error occurs. If the reconnectionBound parameter is specified and the reconnectionPolicy parameter is not set, a compile time error occurs. The default value for the reconnectionBound parameter is 5.

Properties

reconnectionPolicy

This is an optional parameter that specifies the reconnection policy. The valid values are NoRetry, InfiniteRetry, and BoundedRetry. If the parameter is not specified, the reconnection policy is set to BoundedRetry with a reconnectionBound of 5 and a period of 60 seconds.

Properties

Code Templates

XMSSink

() as ${sinkPrefix}Sink = XMSSink(${inputStream})   {
            param
            	connection : "${ConnectionSpecificationName}";
                access : "${AccessSpecificationName}";
        }
      

Metrics

nConnectionAttempts - Counter

The number of connection attempts that are made before a successful connection.

nFailedInserts - Counter

The number of failed inserts to the WebSphere MQ. Failed insertions can occur when a message is dropped because of a run time error.

nTruncatedInserts - Counter

The number of tuples that have truncated attributes when they are converted to a message.

Libraries

IBM Message Service Client for C/C++ (XMS) libraries
Command
Library Name: gxi01l, gxi
Library Path: @XMS_HOME@/lib64
Include Path: @XMS_HOME@/tools/cpp/include, @XMS_HOME@/tools/c/include
WebSphere MQ Client libraries
Command
Library Path: @MQ_HOME@/lib64
Include Path: @MQ_HOME@/lib64
Code that is common to both XMSSource and XMSSink is located in a directory ../Common relative to both of these operators.
Command
Library Path: ../Common
Include Path: ../Common