RX Topic Subscriber

This example shows how to create a Topic Subscriber using the Universal Messaging Reactive library.

Application Source Code

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace MyChannels.Nirvana.Samples
{
    /// <summary>
    /// This is an example of how RX extensions to the Universal Messaging .Net extended API enables integration with the Reactive
    /// Framework from Microsoft
    /// </summary>
    public class RXTopicSubscriber
    {
        // the url of the Universal Messaging realm
        private string url;
        // the channel to subscribe to
        private string channel;
        /// <summary>
        /// Pass the rname url of the realm and the channel to subscribe to
        /// </summary>
        /// <param name="rname">rname of the realm</param>
        /// <param name="rname">cname name of the channel</param>
        public RXTopicSubscriber(string rname, string cname)
        {
            url = rname;
            channel = cname;
            start();
        }
        /// <summary>
        /// Initialise the Session object and set up the Observable query
        /// </summary>
        private void start()
        {
            using (var session = new Session(url))
            {
                // Initialize the session
                session.Initialize();
                // Create consumer & query each message
                var consumer = session.Topics.CreateConsumer(channel);
                var query =
                    from e in consumer.ToObservable()
                    select e.Message;
                // Subscribe
                query.Subscribe(ProcessMessage);
                // Wait for input from the console, exit on key entry
                Console.ReadLine();
                Console.WriteLine("Exiting the application");
            }
        }
        /// <summary>
        /// Deal with the message
        /// </summary>
        /// <param name="m">the Message object received</param>
        public void ProcessMessage(Object m)
        {
            Console.WriteLine("Message :" + ((Message)m).Id);
        }
        static void Main(string[] args)
        {
            new RXTopicSubscriber(args[0], args[1]);
        }
    }
}