Contents


Using the IBM Lotus Expeditor micro broker MQTT client to publish messages

Comments

Messaging middleware provides reliable and flexible connectivity services in the context of a business integration solution. One of IBM's many messaging middleware technologies is MQ Telemetry Transport (MQTT), a protocol supported by the Lotus Expeditor micro broker. MQTT is a TCP/IP-based publish/subscribe messaging protocol, designed for communication over low-overhead networks. The micro broker is a small message broker (less than 2 MB of Java code) that is designed to be deployed on small, appliance-type devices, often at locations remote from the enterprise data center. In this article, you create a sample publisher that is capable of connecting to a Lotus Expeditor micro broker, publish to a topic, and verify the broker's receipt of that message. By the end of this article, you are equipped with the knowledge required to create a simple MQTT publisher for your business.

The history, design impetus, protocol characteristics, and instructions for subscribing to an MQTT-compatible broker are available in another article, "Using MQ Telemetry Transport with WebSphere Business Integration, Part 1: Subscribing." This article extends the concepts of the previous article and goes on to explain how to publish messages using MQTT. The other article covered subscribing, using IBM Websphere Message Broker; this article explores message publication with the newer Lotus Expeditor micro broker. The aspects covered by this article are also directly applicable to WebSphere Message Broker because both support the MQTT protocol.

This article is a guide to writing an MQTT publisher application in Java. The sample application detailed in this article simulates the publication of notification messages for the arrival of a flight at an airport. To simplify the example, we limit the scenario to two air carriers (Air Freedom and Northern Air) and two airports (Raleigh-Durham/RDU and London Heathrow/LHR). The MQTT library available with the Lotus Expeditor micro broker is required to compile this article's example. Additionally, to connect and publish messages with the example MQTT client, a running installation of Lotus Expeditor micro broker is required.

Topics

In publish/subscribe messaging, message destinations are called topics. The MQTT protocol has a hierarchical topic space, which means that topics may be structured in such a way that subscribers and publishers may specify the destination topic using varying degrees of precision. MQTT enforces few rules about the topic space, and you are responsible for designing a logical information space that makes sense for your application.

Experience has shown that a more verbose topic space is preferable to a terse, less descriptive space. For example, using a, b, or c as topics is not a recommended practice. Although you can save bandwidth by using short topics, a well-designed topic space that is more descriptive and that allows the use of wildcards in subscribing applications has substantial benefits.

A topic is a free-form string and may be composed of any single-byte character code. The characters /, +, and #, however, have special meanings discussed later in this article. The topic length can be anything up to 32,767 characters.

Figure 1 is an example of a logically constructed topic space for flight arrival and departure messages. This topic space is referred to throughout the article's examples.

Figure 1. Topic space
Topic space
Topic space

The hierarchy is constructed as follows: Flight Times - Airport - Airline - Arrivals or Departures - Flight Number. When converting the topic hierarchy to a string, use the special forward slash (/) character to delimit the topic hierarchy with each section of the topic adding an extra logical layer of precision. For example, to publish a message about Air Freedom flight 1326 arriving at RDU, the message topic is Flight Times/RDU/Air Freedom/Arrivals/Flight 1326. This logically constructed topic space allows disparate groups of subscribers to create subscriptions that deliver only messages of interest. A poorly planned topic space often results in client over-subscription, in which the client subscribes to a broader topic space than required, and thus receives extraneous messages. The extraneous messages require additional message filtering by the client application to determine the messages of interest. Over-subscription results in wasted bandwidth and reduced client efficiency.

In the following sections, we describe three example groups of distinct subscribers interested in worldwide flight arrival and departure times:

  • A person meeting a flight arriving at an airport
  • An airport displaying flight information
  • An airline tracking the percentage of on-time flights

In the first scenario, a person with an MQTT client application on his or her phone is meeting someone at the airport. In this case, the person is interested in only one flight (Air Freedom flight 1024) at one specific airport (LHR). To receive notification of the arrival of this flight, the client application subscribes to Flight Times/LHR/Air Freedom/Arrivals/Flight 1024.

In the second subscription scenario, Heathrow airport (LHR) displays its arrival and departure information to airport patrons. In this scenario, the client is interested only in arrivals and departures for LHR. The logical topic space allows for a single subscription to request delivery of all information about arrivals and departures into and out of LHR. The required subscription topic string is Flight Times/LHR/#. The # character is a special wildcard that matches all topics to the right of its position in the topic space, and it can be thought of as subscribing to a subtree of the topic space.

The last scenario involves an airline tracking its on-time statistics. Our example airline, Northern Air, is concerned with its worldwide on-time percentage. Therefore, Northern Air requires a single subscription to its worldwide arrival times. In this example, Northern Air is concerned only with arrivals, not departures. The topic string for Northern Air's specific interest in this case is Flight Times/+/Northern Air/Arrivals/#. This topic string uses the special wildcard + character, which enables Northern Air to subscribe to arrivals for all airports. Unlike the # character, it matches only one level in the hierarchy of the topic space and doesn't greedily match all lower levels in the hierarchy as the # character does.

When you create your MQTT application, you must carefully plan the topic space to be logical and to enable flexible subscriptions. This approach ensures that most consumers of publications do not require multiple subscriptions or do not need to over-subscribe.

Connecting MQTT

Publishing messages with MQ Telemetry Transport requires a connection to a Lotus Expeditor micro broker or other messaging server supporting the MQTT protocol, such as WebSphere Message Broker. Several steps are required to create a connection to the broker. An MQTT properties object is constructed and supplied to the client creation factory. This properties object provides configuration for the instantiated client. One of these properties is a Boolean flag, specifying if the client application is a clean session client. If true, each time a client connects, it does so without any prior knowledge of a previous connection to the broker (such as any subscriptions that were made previously or any messages awaiting delivery). If this flag is false, the client state remains in place across connections with the broker; for example, the client application does not need to re-subscribe every time it subsequently reconnects. Additionally, with clean session set to false, the client and broker attempt to resume any in-progress message exchange (depending upon the quality of service specified for the messages) that was interrupted when the connection was broken. To use a non-clean session client, an implementation of the MqttPersistence interface must be provided. The inclusion of an implementation of this interface indicates to the client creation factory that the client application requires the use of persistent (reliable) message delivery. This example utilizes a clean session client with the assumption that the network is sufficiently reliable. After properties configuration, an MQTT client instance is obtained from the MQTT client factory. Creating an MQTT client instance requires several parameters including a unique client ID, a broker IP address and port, and the optional MqttProperties object described previously.

The client ID indicates the identity of each client to the broker. This primarily enables the transfer of persistent messages and maintains subscription state, across a number of connections and disconnections by the client. It is important to note that each client connecting to a broker must have a different client ID. If two clients attempt to use the same client ID when connecting to a broker, the last to connect is honored, and the previously connected one is forcibly disconnected. This is designed to enable the reconnection of a client on a previous connection that has not yet fully cleaned up. The client ID can be up to 23 characters long. See listing 1.

Listing 1. Connecting
    /**
    * Create a MqttClient object after configuring the MqttProperties object as
    * required.
    */
   private MqttClient createClient() throws MqttException {

       MqttProperties mqttProps = new MqttProperties();
       // Stateless "clean session" client
       mqttProps.setCleanStart(true);

       /**
        * Create the client from the factory. The client ID for this client is
        * "testClient" and the URL in the second parameter describes the
        * location of the broker, in this case, on the local machine.
        */
       MqttClient mqttClient = MqttClientFactory.INSTANCE.createMqttClient(
               "testClient", "tcp://mybroker:1883", mqttProps);

       return mqttClient;
   }


   /**
    * Connect the MqttClient to a broker.
    * 
    * @throws MqttException
    *             If an error occurs during connection operations.
    */
   private void connect() throws MqttException {

       /**
        * Register this application for callbacks from the client
        */
       client.registerCallback(this);

       /**
        * Connect the client to a broker.
        */
       client.connect();

   }

Publishing

After MQTT is successfully connected, messages may be published. Applications publish through an MQTT client object. The method signature for publishing a message is int publish(String, MqttPayload, byte, Boolean). The four parameters are explained in detail:

  • String. The topic parameter is of type string, and this string is used by the broker to match the publication against the interests of subscribers (specified using the subscription topic syntax described previously).
  • MqttPayload. The second parameter is an MqttPayload object. The MqttPayload object contains both the application data and any protocol headers for this publication. An offset is provided so that applications can determine where in the MqttPayload the data begins. This enables access to the underlying byte array without having to create additional copies after the data is written to the network. Additionally, access is provided to directly manipulate the data within the payload after object construction and before transmission.
  • Byte. The third parameter, a byte, is the Quality of Service (QoS) for this publication. QoS has three legal values: 0, 1, or 2:

    • A QoS of 0 denotes that the publisher and broker attempt one-time delivery of the message but do not take steps above and beyond those provided by TCP/IP to ensure message delivery. This level is sometimes called fire and forget because the message is sent to its destination without verification of receipt.
    • A QoS setting of 1 specifies that the message is ensured to be delivered to the broker; however, it may be delivered more than once.
    • A QoS value of 2 instructs MQTT to deliver the message once and only once.
    Each increase in QoS level results in additional processor and network overhead. QoS choice can have an effect on the overall scalability of your messaging solution, and it puts increased responsibility on the client to store undelivered messages. Therefore, take care to choose the appropriate QoS level for each message published. In general, try to use lower QoS values unless requirements dictate the need for more stringent insurance of delivery. The QoS value supplied with the message specifies the Quality of Service for the publication between the client and the broker. Additionally, the value designates the maximum QoS level the broker uses to deliver this message to its subscribers.

    Subscribers can specify a maximum QoS for delivery of messages on a per-topic basis, so a message published at QoS 2 may not be delivered to its subscribers at that level. A subscriber may request a downgraded QoS for the messages it receives. Although you may think it strange that a publisher does not control the end-to-end QoS for its messages, the result is increased flexibility for message consumers. When a published message is sent to a subscriber, the broker delivers the publication at either the maximum QoS specified by the subscriber during the subscription process or at the QoS of the published message, if that is lower. For example, a message published at QoS 2 to a subscriber that specified QoS 1 for that topic is delivered at QoS 1. A QoS 0 message published to the same subscriber on that topic is sent to the subscriber at QoS 0.
  • Boolean. The fourth parameter, a Boolean flag, indicates if it is a retained publication. A retained publication is held within the broker as the last message received for a given topic. Retained publications enable subsequent subscribers to receive the most recent message on a topic as soon as they subscribe to it, even if they connect after the message was published. This is very useful for populating a display application as soon as it is started, and then updating it with subsequent changes to the information. If this flag is set to false, only subscribers currently subscribed to that topic receive the message. The example shown in listing 2 uses a non-retained publication.

The publish method returns an integer message ID. This integer can be used in conjunction with a registered MqttAdvancedCallback method to detect when a message has been received by the broker.

The code in listing 2 publishes a message to indicate that Air Freedom's flight 1024 has arrived at London Heathrow Airport (LHR).

Listing 2. Publishing
    /**
    * Invoke from the command line with a single parameter, the broker URI,
    * e.g. tcp://mybroker:1883.
    */
   public static void main(String args[]) {

       MqttPublisher publisher = null;

       try {

           publisher = new MqttPublisher(args[0]);

           /**
            * Connect the newly created publisher to the supplied broker.
            */
           publisher.connect();

           /**
            * Publish an "Arrival" message.
            */
                
           publisher.publishMessage(
                   "Flight Times/LHR/Air Freedom/Arrivals/Flight 1024",
                   (byte) 2, "Arrived");

           /**
            * Sleep for 1 second waiting to receive notification of
            * publication. Real applications should use appropriate
            * inter-thread signaling mechanisms such as wait/notify, 
                     * cyclic barriers or latches.
            */
           Thread.sleep(1000);

       }
       catch (MqttException exception) {
           System.err.println("Exception occurred during either instantiation, 
           connection, or publication: "
                   + exception.getMessage());
       }
       catch (InterruptedException exception) {
           System.err.println("Interrupted while waiting for publication: "
                   + exception.getMessage());
       }
       finally {
           try {
               /**
                * Close the publisher if instantiated.
                */
               if (publisher != null) {
                   publisher.disconnectClient();
               }
           }
           catch (MqttException exception) {
               System.err.println("Exception occurred closing publisher: "
                       + exception.getMessage());
           }
       }
   }


   /**
    * Construct a new MqttPublisher containing an unconnected MqttClient.
    * 
    * @param brokerURL
    *            Broker URL to (eventually) connect to.
    * @throws MqttException
    *             If an underlying MQTT error occurs instantiating the client
    *             object.
    */
   private MqttPublisher(String brokerURL) throws MqttException {
       this.brokerURL = brokerURL;
       this.client = createClient();
   }
 
 /**
  * Publish a string as a message in byte form with the given quality of
  * service to the given topic.
  */
  public void publishMessage(String topic, byte qos, String message)  
  throws MqttException {
       client.publish(topic, new MqttPayload(message.getBytes(), 0), qos,
               false);
   }

Callbacks

The example client is connected to a broker and capable of publishing messages. Similar to the subscribing clients described in our other article, callbacks provide enhanced functionality for publishers. To receive acknowledgments of publications, a callback handler must be created and registered with the MQTT client object. There are two types of callback handlers: a simple callback handler and an advanced callback handler. They are implemented, respectively, by the MqttCallback interface and MqttAdvancedCallback interface. The MqttAdvancedCallback interface extends the MqttCallback, so when you use the advanced callback interface, you must also implement the methods defined in the simple callback interface. To use the advanced interface you must implement three methods: subscribed(int, byte[]), unsubscribed(int), and published(int) in addition to the inherited methods from the simple callback handler.

The first two callback methods, subscribed(int, byte[]) and unsubscribed(int), are for clients that want to monitor subscription acknowledgments. The subscribed method is called by the client when a subscription request has been acknowledged by the broker. Likewise, the second method, unsubscribed, is invoked when a request to unsubscribe from a topic has been acknowledged. Because this example focuses on publishing, these two methods are not used by the example client; however, skeletal implementations are required for successful code compilation.

The method of most interest to a publishing client is the published(int) method. This method provides notification that a message has been successfully delivered to the broker. The method has one parameter, an integer, messageID. Applications may want to match the messageID with the messageID returned by the publish method. The callback is invoked only for messages published with a QoS value of 1 or 2. The use of client-side persistence provided by an MqttPersistence implementation, coupled with use of QoS 1 or 2, renders the use of this callback redundant for the insurance of message delivery. The MQTT client tracks any publications that were in flight in the event of a connection failure, and it attempts to complete the message delivery upon reconnection. Some applications, though, may find it useful for pacing message delivery or providing their own delivery insurance semantics.

To receive notifications, an implementation of the advanced callback interface must be registered with the MQTT client. The registerCallback method provides registration features for the MQTT client. The code example in listing 3, extended from listing 2, is a fully functional MQTT publisher. The MqttAdvancedCallback interface is implemented by the class and registered with the previously created MQTT object. The class can be started from the command line with a single parameter containing a broker URI, for example, tcp://mybroker:1883.

Conclusion

MQTT is a powerful transport for the publish/subscribe messaging paradigm. It has enhanced utility over other publish/subscribe protocols in situations where a small client and low network overhead are required. This article explained how to create a fully functional MQTT publisher. The example client connects to a broker and publishes messages to a topic. The example also illustrated the use of the MqttAdvancedCallback interface for notification of message delivery to the broker.


Downloadable resources


Related topics


Comments

Sign in or register to add and subscribe to comments.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Collaboration, WebSphere
ArticleID=242229
ArticleTitle=Using the IBM Lotus Expeditor micro broker MQTT client to publish messages
publish-date=07242007