Contents


Using MQ Telemetry Transport with WebSphere Business Integration Message Broker

Comments

WebSphere® MQ Telemetry Transport (MQTT) is a messaging technology that was developed at the IBM® Hursley Laboratory, and released in 2001. The transport was designed after IBM analyzed their customers' usage of WebSphere MQ messaging in their businesses, including the data moving through it. IBM discovered that often data was being generated outside the Enterprise at remote locations, and was typically suffering a tortuous journey from the remote location until the point where it hit the edge of the Enterprise. At this point the data was often keyed manually into a computer and was only then transported by the WebSphere MQ Enterprise messaging system. MQTT was developed to extend the reach of WebSphere MQ messaging to these remote locations.

WebSphere Business Integration Message Broker, part of the WebSphere MQ product family, is a publish/subscribe message broker that supports several transports besides MQTT. Other transports include MQ Mobile Transport for hand-held pervasive devices, MQ Multicast Transport for high volume applications such as trading floors, and MQ Real-time Transport for highly scalable Internet/intranet applications. The WebSphere Business Integration Message Broker interconnects these diverse transports together to create a seamless messaging fabric, regardless of device or platform.

MQTT is a TCP/IP based protocol with the following characteristics: very low message overhead (2 byte header), small and easy-to-implement client, and support for the publish/subscribe messaging pattern. If these are not the requirements of your application, one of the other available WebSphere MQ transports may be more advantageous.

The remainder of this article will guide you through the process of writing an MQTT subscriber application in Java. The article assumes you have a running installation of WebSphere Business Integration Message Broker version 2.0.2 or above. The MQTT library available in the resources section is required to compile this article's example.

Connecting MQTT

To use MQ Telemetry Transport a connection must be made to the WebSphere Business Integration Message Broker. Connecting is a simple multi-step process encompassing the following operations: generation of a connection string, creation of the MQTT client object, unique client id determination, picking a cleanstart setting, and choosing a keepalive value. The connection string is a URL formatted string with the connection type in lieu of the protocol. The two valid connection types are tcp and local. Connection type tcp is used to connect to a broker on remote machine. The local connection type provides communication optimizations when used in conjunction with an IBM messaging product used in embedded and pervasive systems, called MicroBroker, and then only when your client is running in the same Java Virtual Machine as the MicroBroker. The discussion of local connection optimizations and MicroBrokers are outside the scope of this article. Our example will use connection type tcp. When using type tcp a port must be specified following the host and be delimited from the host by a ":". The IANA assigned default port for the Telemetry Transport on WebSphere Business Integration Message Broker is 1883. An example valid tcp connection string is tcp://remotehost:1883. After formulation of the connection string the MQTT client object may be created. Once created the client object is ready to connect to the WebSphere Business Integration Message Broker. However, before calling the connect method values for its three parameters must be chosen.

The first of these parameters is the client ID which is a string based identification for the client which must be unique to each MQTT client connecting to the broker. Our example will use a static constant, however in real world applications a unique identification may have to be generated during runtime to allow for multiple instances of a client application to run concurrently. If a client connects to a broker with the same client ID as a currently connected client, the broker will disconnect the currently connected client.

The second parameter is a boolean termed cleanstart which is associated with the notion of "durable" and "non-durable" subscriptions. Generally, messages on topics with a Quality of Service (QoS) 1 or 2 (more on this later) are held on the broker for subscribers that are not currently connected. When cleanstart has a false value and the client disconnects from the broker, the broker will preserve messages destined for the client and will deliver the waiting messages for the client when it reconnects. When cleanstart has a true value the broker will not preserve messages for the disconnected client. The API documentation provided with the MQTT client for cleanstart informs developers, rather ominously, in all capitalized bold letters to "USE WITH CARE". Do not be too alarmed, incorrect use of cleanstart has never caused a fatality. The API documentation is only urging caution because by setting cleanstart to true, the broker is being asked to violate the contract for assured delivery of QoS 1 and 2 messages. A cleanstart value of true is most often used when a device or application needs to be able to restart in a known "clean" state. True should also be used if the client does not require messages to be stored while it is not connected, this ensures that unnecessary messages do not get stored on a broker.

The last parameter, keepalive, is a value of type short whose units are seconds. This value is shared between the client and the broker. The broker will terminate the connection to the client if data is not received during this keep-alive interval. Since clients do not wish to be forcefully disconnected, they will automatically send a keep-alive message if the connection is inactive for the number of seconds specified by this parameter. Care should be taken when choosing a value; too short and a low bandwidth connection will be overrun by keep-alive message, too long and notification of a client disconnection will not be received in a timely manner. A good value for standard applications is 30 seconds. With our values chosen for our three parameters, we are ready to connect.

Listing 1. Connecting to the broker
import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;

/**
* Example of an Mqtt Subscription
**/
public class SubscriptionExample {

   /* MQTT Connection Parameters */
   private final static String CONNECTION_STRING = "tcp://brokerhost:1883";
   private final static boolean CLEAN_START = true;
   private final static short KEEP_ALIVE = 30;
   private final static String CLIENT_ID = "client1";

       /**
       *  Main method.  Creates a new Subscription Example Object.
       **/
       public static void main(String[] args){
          new SubscriptionExample();
       }

       /**
       * Constructor for Subscription Example.
       * Creates and connects a client.
       */
       public SubscriptionExample(){

           try{
               MqttClient client = new MqttClient( CONNECTION_STRING );
               client.connect( CLIENT_ID, CLEAN_START, KEEP_ALIVE );
           }
           catch( MqttException exception ){

                System.out.println( "An error occurred connecting to broker " +
	 	                    CONNECTION_STRING );

                System.out.println( exception.getMessage() );

                Throwable cause = exception.getCause();
                if( cause != null ){
                    System.out.println( "Caused by " + cause.getMessage() );
                }
           }
      }
}

Subscribing

Upon connecting, most subscribers select one or more topics to subscribe to. Once subscribed to a topic, all messages that are published on that topic will be sent to the subscriber. Subscriptions are handled by the MQTT object which passes the appropriate information to its broker. The method for subscribing is aptly named "subscribe" and requires two parameters, both arrays. The two arrays are "index associated", meaning that the values in each position of the arrays are associated with each other. For example, the value in slot 0 of the first array has a corresponding associated value in slot 0 of the second array.

The first array, the topic array, is an array of String objects. Each entry in the array is the name of a topic the client is interested in a subscribing to. Topics are usually arranged in a hierarchy and delimited by the '/' character. The wildcard characters '#' and '+' facilitate subscription to multiple topics matching a pattern. The '#' is a greedy wildcard and matches all topics to the right of it. The '+' will match any '/' delimited section of the topic space. An example where the + wildcard is valuable is an application that monitors fire alarms in an office building campus. One common topic space for such a system would be FireAlarms/<Building>/<Room>/Alarm, where <Building> and <Room> describe the location of each fire alarm on campus. If an application needed to subscribe to all alarms it would use the topic string "FireAlarms/+/+/Alarm". Another example wildcard application might be to subscribe to all information from a specific weather station. The easiest way to do this is to use the '#' wildcard character. For example, the topic string "Weather/London/#" would match all topics for the London weather station.

The second array is of type int. This array is composed of integer mapped Quality of Service (QoS) values for each entry in the topic array. The three possible QoS values for each topic subscription are 0, 1 and 2. A QoS of 0 denotes that messages are delivered to this subscriber with a best effort approach, that is, no guarantees are made that the message will be received by the subscriber. A QoS of 1 guarantees the delivery of all messages on this topic to the subscriber; however the message may be delivered more than once. Use QoS 2 when a message is to be delivered once and only once. It is important to keep in mind that as the QoS number increases so does the associated overhead with each transmission. At first glance, it may seem counter intuitive to set the QoS on the subscription; specially, when the QoS for each message is set by an MQTT publisher. However, this mechanism enables a subscriber to downgrade the QoS of a message. The following example demonstrates the value of this mechanism. Suppose an MQTT publisher is embedded into an electric meter on the side of your house. This publisher sends kilo-watt usage information to your utility company. It is in the best interest of the utility company to insure that every message is always delivered at least once, because undelivered messages result in lost revenue. However, it is equally important to you, the customer that each message is sent only once. This is a prime example of when both a publisher and a subscriber should use a QoS value of 2. Now, imagine this advanced utility company allows you to connect with a Java based MQTT subscriber client to graph your kilo-watt usage over the last day. This application does not need the guarantees of QoS 2 and could downgrade its subscription to QoS 1 or 0, still be effective, and enjoy the benefits of lighter weight transactions.

The following code shows an example subscription to three topics.

Listing 2. Subscribing -- A subscribe method is added, and the two subscription arrays as static constants.
import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;
import com.ibm.mqtt.MqttNotConnectedException;

/**
* Example of an Mqtt Subscription
**/
public class SubscriptionExample {

   /* MQTT Connection Parameters */
   private final static String CONNECTION_STRING = "tcp://brokerhost:1883";
   private final static boolean CLEAN_START = true;
   private final static short KEEP_ALIVE = 30;
   private final static String CLIENT_ID = "client1";

   private final static String[] TOPICS = { "Test/TestTopics/Topic1",
                                            "Test/TestTopics/Topic2",
					     "Test/TestTopics/Topic3" };

   private final static int[] QOS_VALUES = { 0, 1, 2 };

       /**
       *  Main method.  Creates a new Subscription Example Object.
       **/
       public static void main(String[] args){
          new SubscriptionExample();
       }

       /**
       * Constructor for Subscription Example.
       * Creates and connects a client. Initiates
       * subscriptions to topics.
       */
       public SubscriptionExample(){

           try{
               MqttClient client = new MqttClient( CONNECTION_STRING );
               client.connect( CLIENT_ID, CLEAN_START, KEEP_ALIVE );

               try{
                  subscribe( client );
               }
               catch( MqttNotConnectedException exception ){
                    System.out.println( "Connection did not exist " +
		                        "during subscription attempt." );

                    System.out.println( exception.getMessage() );
               }
               catch( MqttException exception ){

                    System.out.println( "An error occurred subscribing to topics." );
                    System.out.println( exception.getMessage() );

                    Throwable cause = exception.getCause();
                    if( cause != null ){
                        System.out.println( "Caused by " + cause.getMessage() );
                    }
               }
           }
           catch( MqttException exception ){

                System.out.println( "An error occurred connecting to broker " +
		                     CONNECTION_STRING );

                System.out.println( exception.getMessage() );

                Throwable cause = exception.getCause();

                if( cause != null ){
                    System.out.println( "Caused by " + cause.getMessage() );
                }
           }
       }

     /**
     * Subscribe to topics.
     * @param client Client to perform subscription operation on.
     * @throws MqttException
     *           If an error occurs while attempting subscription operation.
     * @throws MqttNotConnectedException
     *           If a subscription attempt occurs without a connection.
     */
     protected void subscribe( MqttClient client ) throws MqttException,
        MqttNotConnectedException {

         client.subscribe( TOPICS, QOS_VALUES );

     }
}

Callbacks

The example client is now connected to a broker and subscribed to three topics. The purpose of subscribing is to receive and process messages on the selected topics. Currently, the MQTT client is quietly discarding all messages it receives. To process messages and events a callback handler must be created and registered with the MQTT client. Callback handlers come in two flavors, a simple callback handler and an advanced callback handler. They are respectively implemented by the MqttSimpleCallback interface and MqttAdvancedCallback. Most subscribers do not need the additional advanced callback features and can implement the simple callback interface. When creating an object that uses the simple interface two methods must be implemented, connectionLost and publishArrived.

The connectionLost method is called when the connection to the broker has been unexpectedly terminated. If such an event occurs an attempt can be made to reconnect to the broker. If the connectionLost method is unable to reconnect, an exception should be thrown to notify the client that reconnecting failed.

The publishArrived method provides notification that a message on a subscribed topic has been sent by the broker to the client. The method signature for publishArrived has four parameters, three of which provide metadata and one provides the message payload. The first parameter, of type string, is the topic this message arrived on. The second parameter, a byte array, contains the message data. The third parameter, an integer, specifies the QoS level at which the broker delivered the message.

The last parameter, a boolean flag termed retained, if true indicates that the message is the "last known good" value published to that topic. If the value of the flag is false, the message received is a live, current value. The last retained message published on a topic is immediately sent to a client upon subscription to that topic. This flag is provided because some clients need to process retained messages differently.

Once an implementation of the simple callback interface is created it must be registered with the client. The registerSimpleHandler method of the MqttClient provides this functionality. The following code example, extended from listing 2, is a fully functional MQTT subscriber. The MqttSimpleCallback interface is implemented with an inner class.

Listing 3. Callback Registration. An inner class is added to implement the MqttSimpleCallback interface, and the registration of that handler.
import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttSimpleCallback;
import com.ibm.mqtt.MqttException;
import com.ibm.mqtt.MqttNotConnectedException;

/**
* Example of an Mqtt Subscription
**/
public class SubscriptionExample {

   /* MQTT Connection Parameters */
   private final static String CONNECTION_STRING = "tcp://brokerhost:1883";
   private final static boolean CLEAN_START = true;
   private final static short KEEP_ALIVE = 30;
   private final static String CLIENT_ID = "client1";

   private final static String[] TOPICS = { "Test/TestTopics/Topic1",
                                            "Test/TestTopics/Topic2",
                                            "Test/TestTopics/Topic3" };

   private final static int[] QOS_VALUES = { 0, 1, 2 };

       /**
       *  Main method.  Creates a new Subscription Example Object.
       **/
       public static void main(String[] args){
          new SubscriptionExample();
       }

       /**
       * Constructor for Subscription Example.
       * Creates and connects a client. Initiates
       * subscriptions to topics.
       */
       public SubscriptionExample(){

           try{
               MqttClient client = new MqttClient( CONNECTION_STRING );

		CallbackHandler callbackHandler = new CallbackHandler();
		client.registerSimpleHandler( callbackHandler );

               client.connect( CLIENT_ID, CLEAN_START, KEEP_ALIVE );

               try{
                  subscribe( client );
               }
               catch( MqttNotConnectedException exception ){

                    System.out.println( "Connection did not exist " +
		                        "during subscription attempt." );

                    System.out.println( exception.getMessage() );
               }
               catch( MqttException exception ){

                    System.out.println( "An error occurred subscribing to topics." );
                    System.out.println( exception.getMessage() );

                    Throwable cause = exception.getCause();
                    if( cause != null ){
                        System.out.println( "Caused by " + cause.getMessage() );
                    }
               }
           }
           catch( MqttException exception ){

                System.out.println( "An error occurred connecting to broker " +
		                     CONNECTION_STRING );

                System.out.println( exception.getMessage() );

                Throwable cause = exception.getCause();

                if( cause != null ){
                    System.out.println( "Caused by " + cause.getMessage() );
                }
           }
       }

     /**
     * Subscribe to topics.
     * @param client Client to perform subscription operation on.
     * @throws MqttException
     *           If an error occurs while attempting subscription operation.
     * @throws MqttNotConnectedException
     *           If a subscription attempt occurs without a connection.
     */
     protected void subscribe( MqttClient client ) throws MqttException,
        MqttNotConnectedException {

         client.subscribe( TOPICS, QOS_VALUES );

     }

     /*
     * Inner class for Mqtt Callbacks
     */
     class CallbackHandler implements MqttSimpleCallback {

         /**
         * Connection has been lost to broker.
         * API requests that we reconnect. This example just gives up.
         * @throws Exception If unable to re-connect.
         */
         public void connectionLost() throws Exception {
	     System.out.println( "Connection between client and broker has been lost." );
             throw new Exception("Did not even try to reconnect.");
         }

         /**
         * Message has been received on a topic we are subscribed to.
         * Print out the metadata and the message payload.
         * @param topicName Topic message arrived on.
	 * @param payload
	 *           Message payload.  This implementation processes the
	 *           byte arrays for Strings.
	 * @param QoS Message Quality of Service.
	 * @param retained Message retention setting.
         */
	  public void publishArrived( String topicName, byte[] payload, int QoS,
	     boolean retained ){

	      System.out.println( "A message has arrived on topic " + topicName +
	                          " at QoS " + QoS + " with a retention value of " +
				  retained + ".\nThe message is: " +
				  new String( payload ) );

	  }
     }
}

Conclusion

MQTT is a powerful transport for the publish/subscribe messaging paradigm. It is especially useful in situations where small client footprint and low message overhead are required. This article has explained how to create a fully functional MQTT subscription client. The example client connected to a broker, subscribed to topics and processed messages. Look for a forthcoming article by Andy and Brian explaining how to write an MQTT Publisher.


Downloadable resources


Related topics


Comments

Sign in or register to add and subscribe to comments.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=WebSphere
ArticleID=93249
ArticleTitle=Using MQ Telemetry Transport with WebSphere Business Integration Message Broker
publish-date=08312005