In the sample, the C# code in the .NETInput node uses the Polling template to receive messages from MSMQ. You can also use an Event template to do the same, by using the following code:
public class RetrieveFromMsmqUsingEvent : NBEventInputConnector { /// <summary> /// MSMQ queue /// </summary> private MessageQueue msgQ; /// <summary> /// MSMQ queue name /// </summary> private string queueName; /// <summary> /// Initializes a new instance of the <see cref="RetrieveFromMsmqUsingEvent" /> class. /// </summary> /// <param name="connectorFactory">The owning factory.</param> /// <param name="name">The name of the connector.</param> /// <param name="properties">Contains any user properties placed on the node using this connector, /// and any flow level User Defined Properties.</param> public RetrieveFromMsmqUsingEvent(NBConnectorFactory connectorFactory, string name, Dictionary<string, string> properties) : base(connectorFactory, name, properties) { if (properties.ContainsKey("queueName")) { this.queueName = properties["queueName"]; } else { this.queueName = "IBDotNetReplyQueue"; } } /// <summary> /// Start Method /// </summary> public override void Start() { this.msgQ = DotNetInputUtility.CreateOrReadPrivateQueue(this.queueName); this.msgQ.MessageReadPropertyFilter.CorrelationId = true; // Add an event handler for the ReceiveCompleted event. this.msgQ.ReceiveCompleted += new ReceiveCompletedEventHandler(this.MessageReceived); // Begin the asynchronous receive operation. this.msgQ.BeginReceive(); } /// <summary> /// Stop Method /// </summary> public override void Finish() { this.msgQ.Close(); } /// <summary> /// Receive message from MSMQ using asynchronous Receive operation. /// </summary> /// <param name="source">source object</param> /// <param name="asyncResult">result for received message</param> private void MessageReceived(object source, ReceiveCompletedEventArgs asyncResult) { MessageQueue mq = null; try { // Connect to the queue mq = (MessageQueue)source; // End the asynchronous Receive operation. Message msmqMsg = mq.EndReceive(asyncResult.AsyncResult); // Process the message here. // When the correlation is empty, then will use input msg id. string msgId = string.IsNullOrEmpty(msmqMsg.CorrelationId) ? msmqMsg.Id : msmqMsg.CorrelationId; byte[] msgBolb = System.Text.Encoding.Default.GetBytes( new StreamReader(msmqMsg.BodyStream).ReadToEnd()); NBEvent msmqEvent = new MsmqEvent(this, msgBolb, msgId); this.DeliverEvent(msmqEvent); } finally { // Restart the asynchronous Receive operation. this.msgQ.BeginReceive(); } } }
Define the NBMSMQEvent, so that user can save correlation id in the Local Environment
public class MsmqEvent : NBByteArrayInputEvent { /// <summary> /// Correlation id of MSMQ /// </summary> private string correlationId; /// <summary> /// Initializes a new instance of the <see cref="MsmqEvent" /> class /// </summary> /// <remark> /// This event will build correlation id into Local Environment /// </remark> /// <param name="connector">The connector</param> /// <param name="value">The input byte array </param> /// <param name="correlationId">The correlation id </param> public MsmqEvent(NBEventInputConnector connector, byte[] value, string correlationId) : base(connector, value) { this.correlationId = correlationId; } /// <summary> /// Build the properties containing the correlation id to be placed in the LocalEnvironment. /// </summary> /// <return>A Dictionary for properties</return> public override Dictionary<string, string> BuildProperties() { Dictionary<string, string> properties = new Dictionary<string, string>(); properties.Add(DotNetInputUtility.CorrelationID, this.correlationId); return properties; } }
The .NETInput node with C# code that can receive messages from MSMQ can be made into a reusable MSMQ Input node with its own icon, and retrieved from the palette.
Complete the following steps:
For information about creating a Cloned node from a .NETInput node, see .NETInput node in the IBM Integration Bus Information Center.
In this sample, we include the .NET assembly in the BAR file named DotNetInput.bar by packaging it in a .NET application domain. Users can deploy the bar file directly to run this sample. For information about how to deploy the packaged .NET assembly, see Deploying .NET Assemblies in the IBM Integration Bus Information Center.
If you want to load the .NET assembly directly from your broker file system and debug it with HotSwap from Visual Studio, you can execute the following manual steps:
In this sample we have used .NET System.Messaging API to send and receive non-transactional messages using MSMQ. If you want to use transactional messages, you can change the C# code as shown below:
/// Create a transactional queue MessageQueue msgQ = MessageQueue.Create(".\private$\IBDotNetInputQueue", true); /// Send two messages to a transactional queue MessageQueueTransaction msgTx = new MessageQueueTransaction(); msgTx.Begin(); Message msg = new Message("This is a transactional message"); msgQ.Send(msg, msgTx); Message msg2 = new Message("This is a transactional message"); msgQ.Send(msg2, msgTx); msgTx.Commit(); /// Receive one message from a transactional queue MessageQueueTransaction msgTx2 = new MessageQueueTransaction(); msgTx2.Begin(); /// Pass the existing TimeSpan timeout parameter Message receivedMsg = msgQ.Receive(timeSpan,msgTx2); msgTx2.Commit();
The transactional receive of a message also can be integrated with the commit or rollback of the message flow itself. This is accomplished by overriding the Confirm and MarkAsFailed methods on the NBPollingResult or NBEvent classes. A reference to the MessageQueueTransaction object should be stored within your subclass and used to Commit or Rollback the transaction when called by the broker. Note that the Confirm and MarkAsFailed methods are called as part of the broker's one-phase commit processing.