Extending the .NETInput node for MSMQ sample

You can extend the sample to use the Event-driven template to receive MSMQ messages, implementing the same MSMQ input.

In the sample, the C# code in the .NETInput node uses the Polling template to receive messages from MSMQ. You can also use an Event template to do the same, by using the following code:

public class RetrieveFromMsmqUsingEvent : NBEventInputConnector
    {
        /// <summary>
        /// MSMQ queue
        /// </summary>
        private MessageQueue msgQ;

        /// <summary>
        /// MSMQ queue name
        /// </summary>
        private string queueName;

        /// <summary>
        /// Initializes a new instance of the <see cref="RetrieveFromMsmqUsingEvent" /> class.
        /// </summary>
        /// <param name="connectorFactory">The owning factory.</param>
        /// <param name="name">The name of the connector.</param>
        /// <param name="properties">Contains any user properties placed on the node using this connector, 
        /// and any flow level User Defined Properties.</param>
        public RetrieveFromMsmqUsingEvent(NBConnectorFactory connectorFactory, 
        string name, Dictionary<string, string> properties)
            : base(connectorFactory, name, properties)
        {
            if (properties.ContainsKey("queueName"))
            {
                this.queueName = properties["queueName"];
            }
            else
            {
                this.queueName = "IBDotNetReplyQueue";
            }
        }

        /// <summary>
        /// Start Method
        /// </summary>
        public override void Start()
        {
            this.msgQ = DotNetInputUtility.CreateOrReadPrivateQueue(this.queueName);
            this.msgQ.MessageReadPropertyFilter.CorrelationId = true;

            // Add an event handler for the ReceiveCompleted event.
            this.msgQ.ReceiveCompleted += new ReceiveCompletedEventHandler(this.MessageReceived);

            // Begin the asynchronous receive operation.
            this.msgQ.BeginReceive();
        }

        /// <summary>
        /// Stop Method
        /// </summary>
        public override void Finish()
        {
            this.msgQ.Close();
        }

        /// <summary>
        /// Receive message from MSMQ using asynchronous Receive operation.
        /// </summary>
        /// <param name="source">source object</param>
        /// <param name="asyncResult">result for received message</param>
        private void MessageReceived(object source, ReceiveCompletedEventArgs asyncResult)
        {
            MessageQueue mq = null;
            try
            {
                // Connect to the queue
                mq = (MessageQueue)source;

                // End the asynchronous Receive operation.
                Message msmqMsg = mq.EndReceive(asyncResult.AsyncResult);

                // Process the message here.
                // When the correlation is empty, then will use input msg id.
                string msgId = string.IsNullOrEmpty(msmqMsg.CorrelationId) ? 
                	msmqMsg.Id : msmqMsg.CorrelationId;
                byte[] msgBolb = System.Text.Encoding.Default.GetBytes(
                new StreamReader(msmqMsg.BodyStream).ReadToEnd());
                NBEvent msmqEvent = new MsmqEvent(this, msgBolb, msgId);
                this.DeliverEvent(msmqEvent);
            }
            finally
            {
                // Restart the asynchronous Receive operation.
                this.msgQ.BeginReceive();
            }
        }
    }

Define the NBMSMQEvent, so that user can save correlation id in the Local Environment

public class MsmqEvent : NBByteArrayInputEvent
    {
        /// <summary>
        /// Correlation id of MSMQ
        /// </summary>
        private string correlationId;

        /// <summary>
        /// Initializes a new instance of the <see cref="MsmqEvent" /> class
        /// </summary>
        /// <remark>
        /// This event will build correlation id into Local Environment
        /// </remark>
        /// <param name="connector">The connector</param>
        /// <param name="value">The input byte array </param>
        /// <param name="correlationId">The correlation id </param>
        public MsmqEvent(NBEventInputConnector connector, byte[] value, string correlationId)
            : base(connector, value)
        {
            this.correlationId = correlationId;
        }

        /// <summary>
        /// Build the properties containing the correlation id to be placed in the LocalEnvironment.
        /// </summary>
        /// <return>A Dictionary for properties</return>
        public override Dictionary<string, string> BuildProperties()
        {
            Dictionary<string, string> properties = new Dictionary<string, string>();
            properties.Add(DotNetInputUtility.CorrelationID, this.correlationId);
            return properties;
        }
    }

Extending the sample to create a MSMQ Input node by using the .NETInput Cloned node.

The .NETInput node with C# code that can receive messages from MSMQ can be made into a reusable MSMQ Input node with its own icon, and retrieved from the palette.

Complete the following steps:

  1. Open the SimulateMSMQApp.msgflow message flow.
  2. Right-click the .NETInput From MSMQ .NETInput node.
  3. Select Create Cloned node...
  4. In the dialog box fields, input the values for node Settings and optionally choose icons for node Icons. For this example, input "MSMQInput" for Name.
  5. Click Finish. After generating UDN process, a new MSMQInput node is created. You can choose it under the category .NET in the flow palette.

For information about creating a Cloned node from a .NETInput node, see .NETInput node in the IBM Integration Bus Information Center.

Extending the sample to edit C# sample code and debug it with HotSwap from Visual Studio.

In this sample, we include the .NET assembly in the BAR file named DotNetInput.bar by packaging it in a .NET application domain. Users can deploy the bar file directly to run this sample. For information about how to deploy the packaged .NET assembly, see Deploying .NET Assemblies in the IBM Integration Bus Information Center.

If you want to load the .NET assembly directly from your broker file system and debug it with HotSwap from Visual Studio, you can execute the following manual steps:

  1. Open the .NET solution source code using Visual Studio, source code is supplied with the runtime IBM Integration Bus installation. If you have chosen the default installation path, this folder is located at: C:\Program Files\IBM\MQSI\9.0.0.0\sample\DotNet\DotNetInput\VS2010 and C:\Program Files\IBM\MQSI\9.0.0.0\sample\DotNet\DotNetInput\VS2012.
  2. Edit the C# code and rebuild the solution. Open the message flow editor, associate your compiled assembly with a .NETInput node, complete the Assembly name and Class name properties on the node.
  3. Optional: change the existed application domain following the guide Implicit application domains.
  4. Drag and drop the application to the integration server to deploy this application.
  5. If you want to debug it with HotSwap from Visual Studio, you can attach to process named DataFlowEngine.exe in the Visual Studio.

Extending the sample to create and receive messages from MSMQ transactional queues.

In this sample we have used .NET System.Messaging API to send and receive non-transactional messages using MSMQ. If you want to use transactional messages, you can change the C# code as shown below:

/// Create a transactional queue
MessageQueue msgQ = MessageQueue.Create(".\private$\IBDotNetInputQueue", true);

/// Send two messages to a transactional queue
MessageQueueTransaction msgTx = new MessageQueueTransaction();
msgTx.Begin();
Message msg = new Message("This is a transactional message");
msgQ.Send(msg, msgTx);
Message msg2 = new Message("This is a transactional message");
msgQ.Send(msg2, msgTx);
msgTx.Commit();

/// Receive one message from a transactional queue
MessageQueueTransaction msgTx2 = new MessageQueueTransaction();
msgTx2.Begin();
/// Pass the existing TimeSpan timeout parameter
Message receivedMsg = msgQ.Receive(timeSpan,msgTx2);
msgTx2.Commit();
	

The transactional receive of a message also can be integrated with the commit or rollback of the message flow itself. This is accomplished by overriding the Confirm and MarkAsFailed methods on the NBPollingResult or NBEvent classes. A reference to the MessageQueueTransaction object should be stored within your subclass and used to Commit or Rollback the transaction when called by the broker. Note that the Confirm and MarkAsFailed methods are called as part of the broker's one-phase commit processing.

Back to sample home