Message Streaming

Overview

This appendix describes the IBM webMethods message streaming feature, which allows you to stream large amounts of data or a large file from a message producer to a message consumer.

Introduction

The IBM webMethods streaming feature permits the streaming of large amounts of data/large files (greater than 1 Mb). The interface for this mechanism includes stream interfaces; these interfaces do not support transacted sessions.

Supporting Interfaces

Support for message streaming is provided through the following IBM webMethods interfaces:

JMS

  • Extensions to WmMessageProducer, which has methods to get the output stream object. In the message stream, there is a method to write data to the message producer.
  • Extension to WmMessageConsumer, which has a method to get the input stream object. In the message stream, there is a method to read data from a message consumer.

Implementation of the streaming API introduces two new classes: WmJMSInputStream and WmJMSOutputStream. These classes extend the java.io.InputStream and java.io.OutputStream classes, and are returned by WmMessageConsumer.getInputStream() and WmMessageProducer.getOutputStream().

The WmJMSOutputStream class streams data written to it by packaging the data into 1 Mb chunks and sending the data to the message producer's destination using a JMS BytesMessage. The quality of service (persistent or non-persistent), along with priority and expiration, will be the same as those set for the message producer. The WmJMSInputStream class asynchronously receives the BytesMessages and makes the data available when the stream is read.

Once the WmJMSOutputStream is closed, a final BytesMessage is sent with the provider-specific Boolean property JMS_WM_END_OF_STREAMset to true. When this is received by the WmJMSInputStream class, it will return -1 from read() to indicate that the end of the stream has been reached.

C#

  • Interfaces to IMessageProducer, which has a method returning class MsgOutputStream. You can use the class to write data.
  • Interfaces to IMessageConsumer, which has a method to read data from the class MsgInputStream.

Implementation of the streaming API introduces two new classes: MsgInputStream and MsgOutputStream. These extend the stream class and are returned by IMessageConsumer.getInputStream() and IMessageConsumer.getOutputStream().

The MsgOutputStream class streams data written to it by packaging the data and sending it to the message producer's destination using a BytesMessage. The quality of service (persistent or non-persistent), along with priority and expiration, will be the same as those set for the message producer. The MsgInputStream class asynchronously receives the BytesMessages and makes the data available when the stream is read.

Once the MsgOutputStream is closed, a final BytesMessage is sent with the provider-specific Boolean property JMS_WM_END_OF_STREAM set to true. When this is received by the MsgInputStream class, it will return 0 from read() indicating that the end of the stream has been reached.

Support for Multiple Message Producers

The message streaming feature supports the existence of multiple streams. To identify each stream:

  • JMS uses a stream identification property to identify the next unique string. You can get or set this string for recovery purposes.
    public  void setStreamID (String streamID);  
    public  void getStreamID ()
  • C# uses the following to identify multiple streams:
    public  String StreamID ()  
    {  
       set;  
       get;  
    }

The message consumer will create a Broker selector set to Broker Server (with the header property Begin_of_Stream set to true) when initially receiving the message.

Support for Read Timeout Setting

A user can specify the timeout for the WmJMSInputStream read function by the following API in the WmJMSInputStream class:

void setReadTimeout(long millisecond);

The WmReadTimeoutException will be thrown when a timeout occurs.

Recovery Operations

In the event of a failure on the part of the message producer or consumer, the interface provides several methods and properties to assist in recovery operations.

The following methods are used to support crash recovery. You need to use setStreamID() and getStreamID() (for JMS), and StreamID() (for C#), described earlier, to identify the correct stream in the event of a crash.

JMS

For WmJMSInputStream:

public  void  markStreamBytePosition (int pos)  
public  int   currentReadBytePosition ()

For WmJMSOutputStream:

public  void  markStreamBytePosition (int pos)  
public  int   currentWrittenBytePosition ()

C#

public  Long Position ()  
{  
   set;  
   get;  
}

On the message consumer side, if the user specified CLIENT_ACKNOWLEDGE during session creation, all of the messages will be acknowledged when the input stream is closed.

For additional information about specifying the location in a stream, see Technical Considerations.

Example

Following are sample code fragments for receiving and playing an audio file using WmJMSInputStream for JMS, and MsgInputStream for C#:

JMS

// Create a non-transacted session.  
Session sess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)  
    
// Create a message consumer.  
MessageConsumer consumer = sess.createConsumer(topic);  
    
// Wrap the message consumer’s input stream in an audio stream.  
AudioStream as = new AudioStream(as);  
    
// Start the audio player.  
AudioPlayer.player.start(as);

C#

// Create a non-transacted session.  
Session sess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)  
    
// Create a message consumer.  
IMessageConsumer consumer = sess.createConsumer(topic);  
MsgInputStream consumer is = consumer.getInputStream();  
    
// Wrap the message consumer’s input stream in an audio stream.  
AudioStream as = new AudioStream(as);  
    
// Start the audio player.  
AudioPlayer.player.start(as);

Size Guidelines

Consider using the IBM webMethods message streaming feature only when message size is 1 MB or greater; otherwise, use regular messaging.

For messages averaging between 1 MB and 1 GB in size, use CLIENT_ACKNOWLEDGE when creating a session. For messages averaging over 1 GB in size, use AUTO_ACKNOWLEDGE when creating a session.

Technical Considerations

Following is additional information on properties used to specify location in a message stream.

Property Description
STREAM_BYTE_POSITION Specifies the byte position of the stream that will be sent to the message consumer. The consumer stores this value locally.Whenever the consumer receives the incoming byte position number, and it is less than the local byte position number, it skips the streaming data until the incoming byte position number exceeds that value. Thus, the property provides a means for handling situations where the producer fails during the streaming process and attempts recovery.
JMS_WM_END_OF_STREAM Identifies the end of a stream. When the message consumer receives this value, it will clear the filter on the Broker Server side.