Using the IBM Lotus Expeditor micro broker MQTT client to publish messages

Learn how to use the IBM Lotus Expeditor micro broker to support the MQ Telemetry Transport (MQTT) publish/subscribe messaging protocol. Create a sample publisher, publish to a topic, and verify the receipt of the message.

Share:

Brian O'Connell (boc@us.ibm.com), Advisory Software Engineer, High Performance On Demand Services, IBM

Brian O'Connell is a software engineer at IBM in Research Triangle Park, NC. He works for the Events Infrastructure team. Brian's expertise includes concurrent programming in Java, scalable I/O designs, and publish/subscribe systems. He is the lead architect and developer of the publish/subscribe system that supports IBM-sponsored sporting event Web sites. Brian received a Bachelor of Science degree in Computer Science from Virginia Polytechnic Institute and State University. You can reach him at boc@us.ibm.com.



Andy Stanford-Clark (andysc@uk.ibm.com), Distinguished Engineer, Master Inventor, Manager, Pervasive Messaging Technologies, IBM 

Andy Stanford-Clark is an IBM Distinguished Engineer and designated Master Inventor, based at IBM's Hursley Park Development Lab in the UK. He specializes in message broker technology, remote telemetry, Internet technologies, and pervasive computing. He leads an Advanced Technology team, building the technology to integrate data from remote monitoring and control devices into business applications. Andy holds a BSc degree in Computer Science and Mathematics, and a PhD in Parallel Computing, both from the University of East Anglia, Norwich, UK. You can contact Andy at andysc@uk.ibm.com.



Martin Gale (martin_gale@uk.ibm.com), Senior IT Specialist, Master Inventor, Lead Developer of the Lotus Expeditor micro broker, IBM

Martin Gale is the lead developer of the Lotus Expeditor micro broker and is an appointed IBM Master Inventor. Martin works in the IBM UK Software Development Laboratory in Hursley. Martin's background is in exploitation of emerging technologies for the Web and pervasive channels. Prior to his work in software development, Martin worked in a services role working with IBM's customers to develop first-of-a-kind solutions using new technologies. Martin holds a first class BSc degree in Computer Science from Portsmouth University for which he was sponsored by IBM. You can reach Martin at martin_gale@uk.ibm.com.



24 July 2007

Also available in Chinese Russian Japanese

Messaging middleware provides reliable and flexible connectivity services in the context of a business integration solution. One of IBM's many messaging middleware technologies is MQ Telemetry Transport (MQTT), a protocol supported by the Lotus Expeditor micro broker. MQTT is a TCP/IP-based publish/subscribe messaging protocol, designed for communication over low-overhead networks. The micro broker is a small message broker (less than 2 MB of Java code) that is designed to be deployed on small, appliance-type devices, often at locations remote from the enterprise data center. In this article, you create a sample publisher that is capable of connecting to a Lotus Expeditor micro broker, publish to a topic, and verify the broker's receipt of that message. By the end of this article, you are equipped with the knowledge required to create a simple MQTT publisher for your business.

The history, design impetus, protocol characteristics, and instructions for subscribing to an MQTT-compatible broker are available in another article, "Using MQ Telemetry Transport with WebSphere Business Integration, Part 1: Subscribing." This article extends the concepts of the previous article and goes on to explain how to publish messages using MQTT. The other article covered subscribing, using IBM Websphere Message Broker; this article explores message publication with the newer Lotus Expeditor micro broker. The aspects covered by this article are also directly applicable to WebSphere Message Broker because both support the MQTT protocol.

This article is a guide to writing an MQTT publisher application in Java. The sample application detailed in this article simulates the publication of notification messages for the arrival of a flight at an airport. To simplify the example, we limit the scenario to two air carriers (Air Freedom and Northern Air) and two airports (Raleigh-Durham/RDU and London Heathrow/LHR). The MQTT library available with the Lotus Expeditor micro broker is required to compile this article's example. Additionally, to connect and publish messages with the example MQTT client, a running installation of Lotus Expeditor micro broker is required.

Topics

In publish/subscribe messaging, message destinations are called topics. The MQTT protocol has a hierarchical topic space, which means that topics may be structured in such a way that subscribers and publishers may specify the destination topic using varying degrees of precision. MQTT enforces few rules about the topic space, and you are responsible for designing a logical information space that makes sense for your application.

Experience has shown that a more verbose topic space is preferable to a terse, less descriptive space. For example, using a, b, or c as topics is not a recommended practice. Although you can save bandwidth by using short topics, a well-designed topic space that is more descriptive and that allows the use of wildcards in subscribing applications has substantial benefits.

A topic is a free-form string and may be composed of any single-byte character code. The characters /, +, and #, however, have special meanings discussed later in this article. The topic length can be anything up to 32,767 characters.

Figure 1 is an example of a logically constructed topic space for flight arrival and departure messages. This topic space is referred to throughout the article's examples.

Figure 1. Topic space
Topic space

The hierarchy is constructed as follows: Flight Times - Airport - Airline - Arrivals or Departures - Flight Number. When converting the topic hierarchy to a string, use the special forward slash (/) character to delimit the topic hierarchy with each section of the topic adding an extra logical layer of precision. For example, to publish a message about Air Freedom flight 1326 arriving at RDU, the message topic is Flight Times/RDU/Air Freedom/Arrivals/Flight 1326. This logically constructed topic space allows disparate groups of subscribers to create subscriptions that deliver only messages of interest. A poorly planned topic space often results in client over-subscription, in which the client subscribes to a broader topic space than required, and thus receives extraneous messages. The extraneous messages require additional message filtering by the client application to determine the messages of interest. Over-subscription results in wasted bandwidth and reduced client efficiency.

In the following sections, we describe three example groups of distinct subscribers interested in worldwide flight arrival and departure times:

  • A person meeting a flight arriving at an airport
  • An airport displaying flight information
  • An airline tracking the percentage of on-time flights

In the first scenario, a person with an MQTT client application on his or her phone is meeting someone at the airport. In this case, the person is interested in only one flight (Air Freedom flight 1024) at one specific airport (LHR). To receive notification of the arrival of this flight, the client application subscribes to Flight Times/LHR/Air Freedom/Arrivals/Flight 1024.

In the second subscription scenario, Heathrow airport (LHR) displays its arrival and departure information to airport patrons. In this scenario, the client is interested only in arrivals and departures for LHR. The logical topic space allows for a single subscription to request delivery of all information about arrivals and departures into and out of LHR. The required subscription topic string is Flight Times/LHR/#. The # character is a special wildcard that matches all topics to the right of its position in the topic space, and it can be thought of as subscribing to a subtree of the topic space.

The last scenario involves an airline tracking its on-time statistics. Our example airline, Northern Air, is concerned with its worldwide on-time percentage. Therefore, Northern Air requires a single subscription to its worldwide arrival times. In this example, Northern Air is concerned only with arrivals, not departures. The topic string for Northern Air's specific interest in this case is Flight Times/+/Northern Air/Arrivals/#. This topic string uses the special wildcard + character, which enables Northern Air to subscribe to arrivals for all airports. Unlike the # character, it matches only one level in the hierarchy of the topic space and doesn't greedily match all lower levels in the hierarchy as the # character does.

When you create your MQTT application, you must carefully plan the topic space to be logical and to enable flexible subscriptions. This approach ensures that most consumers of publications do not require multiple subscriptions or do not need to over-subscribe.


Connecting MQTT

Publishing messages with MQ Telemetry Transport requires a connection to a Lotus Expeditor micro broker or other messaging server supporting the MQTT protocol, such as WebSphere Message Broker. Several steps are required to create a connection to the broker. An MQTT properties object is constructed and supplied to the client creation factory. This properties object provides configuration for the instantiated client. One of these properties is a Boolean flag, specifying if the client application is a clean session client. If true, each time a client connects, it does so without any prior knowledge of a previous connection to the broker (such as any subscriptions that were made previously or any messages awaiting delivery). If this flag is false, the client state remains in place across connections with the broker; for example, the client application does not need to re-subscribe every time it subsequently reconnects. Additionally, with clean session set to false, the client and broker attempt to resume any in-progress message exchange (depending upon the quality of service specified for the messages) that was interrupted when the connection was broken. To use a non-clean session client, an implementation of the MqttPersistence interface must be provided. The inclusion of an implementation of this interface indicates to the client creation factory that the client application requires the use of persistent (reliable) message delivery. This example utilizes a clean session client with the assumption that the network is sufficiently reliable. After properties configuration, an MQTT client instance is obtained from the MQTT client factory. Creating an MQTT client instance requires several parameters including a unique client ID, a broker IP address and port, and the optional MqttProperties object described previously.

The client ID indicates the identity of each client to the broker. This primarily enables the transfer of persistent messages and maintains subscription state, across a number of connections and disconnections by the client. It is important to note that each client connecting to a broker must have a different client ID. If two clients attempt to use the same client ID when connecting to a broker, the last to connect is honored, and the previously connected one is forcibly disconnected. This is designed to enable the reconnection of a client on a previous connection that has not yet fully cleaned up. The client ID can be up to 23 characters long. See listing 1.

Listing 1. Connecting
    /**
    * Create a MqttClient object after configuring the MqttProperties object as
    * required.
    */
   private MqttClient createClient() throws MqttException {

       MqttProperties mqttProps = new MqttProperties();
       // Stateless "clean session" client
       mqttProps.setCleanStart(true);

       /**
        * Create the client from the factory. The client ID for this client is
        * "testClient" and the URL in the second parameter describes the
        * location of the broker, in this case, on the local machine.
        */
       MqttClient mqttClient = MqttClientFactory.INSTANCE.createMqttClient(
               "testClient", "tcp://mybroker:1883", mqttProps);

       return mqttClient;
   }


   /**
    * Connect the MqttClient to a broker.
    * 
    * @throws MqttException
    *             If an error occurs during connection operations.
    */
   private void connect() throws MqttException {

       /**
        * Register this application for callbacks from the client
        */
       client.registerCallback(this);

       /**
        * Connect the client to a broker.
        */
       client.connect();

   }

Publishing

After MQTT is successfully connected, messages may be published. Applications publish through an MQTT client object. The method signature for publishing a message is int publish(String, MqttPayload, byte, Boolean). The four parameters are explained in detail:

  • String. The topic parameter is of type string, and this string is used by the broker to match the publication against the interests of subscribers (specified using the subscription topic syntax described previously).
  • MqttPayload. The second parameter is an MqttPayload object. The MqttPayload object contains both the application data and any protocol headers for this publication. An offset is provided so that applications can determine where in the MqttPayload the data begins. This enables access to the underlying byte array without having to create additional copies after the data is written to the network. Additionally, access is provided to directly manipulate the data within the payload after object construction and before transmission.
  • Byte. The third parameter, a byte, is the Quality of Service (QoS) for this publication. QoS has three legal values: 0, 1, or 2:

    • A QoS of 0 denotes that the publisher and broker attempt one-time delivery of the message but do not take steps above and beyond those provided by TCP/IP to ensure message delivery. This level is sometimes called fire and forget because the message is sent to its destination without verification of receipt.
    • A QoS setting of 1 specifies that the message is ensured to be delivered to the broker; however, it may be delivered more than once.
    • A QoS value of 2 instructs MQTT to deliver the message once and only once.
    Each increase in QoS level results in additional processor and network overhead. QoS choice can have an effect on the overall scalability of your messaging solution, and it puts increased responsibility on the client to store undelivered messages. Therefore, take care to choose the appropriate QoS level for each message published. In general, try to use lower QoS values unless requirements dictate the need for more stringent insurance of delivery. The QoS value supplied with the message specifies the Quality of Service for the publication between the client and the broker. Additionally, the value designates the maximum QoS level the broker uses to deliver this message to its subscribers.

    Subscribers can specify a maximum QoS for delivery of messages on a per-topic basis, so a message published at QoS 2 may not be delivered to its subscribers at that level. A subscriber may request a downgraded QoS for the messages it receives. Although you may think it strange that a publisher does not control the end-to-end QoS for its messages, the result is increased flexibility for message consumers. When a published message is sent to a subscriber, the broker delivers the publication at either the maximum QoS specified by the subscriber during the subscription process or at the QoS of the published message, if that is lower. For example, a message published at QoS 2 to a subscriber that specified QoS 1 for that topic is delivered at QoS 1. A QoS 0 message published to the same subscriber on that topic is sent to the subscriber at QoS 0.
  • Boolean. The fourth parameter, a Boolean flag, indicates if it is a retained publication. A retained publication is held within the broker as the last message received for a given topic. Retained publications enable subsequent subscribers to receive the most recent message on a topic as soon as they subscribe to it, even if they connect after the message was published. This is very useful for populating a display application as soon as it is started, and then updating it with subsequent changes to the information. If this flag is set to false, only subscribers currently subscribed to that topic receive the message. The example shown in listing 2 uses a non-retained publication.

The publish method returns an integer message ID. This integer can be used in conjunction with a registered MqttAdvancedCallback method to detect when a message has been received by the broker.

The code in listing 2 publishes a message to indicate that Air Freedom's flight 1024 has arrived at London Heathrow Airport (LHR).

Listing 2. Publishing
    /**
    * Invoke from the command line with a single parameter, the broker URI,
    * e.g. tcp://mybroker:1883.
    */
   public static void main(String args[]) {

       MqttPublisher publisher = null;

       try {

           publisher = new MqttPublisher(args[0]);

           /**
            * Connect the newly created publisher to the supplied broker.
            */
           publisher.connect();

           /**
            * Publish an "Arrival" message.
            */
                
           publisher.publishMessage(
                   "Flight Times/LHR/Air Freedom/Arrivals/Flight 1024",
                   (byte) 2, "Arrived");

           /**
            * Sleep for 1 second waiting to receive notification of
            * publication. Real applications should use appropriate
            * inter-thread signaling mechanisms such as wait/notify, 
                     * cyclic barriers or latches.
            */
           Thread.sleep(1000);

       }
       catch (MqttException exception) {
           System.err.println("Exception occurred during either instantiation, 
           connection, or publication: "
                   + exception.getMessage());
       }
       catch (InterruptedException exception) {
           System.err.println("Interrupted while waiting for publication: "
                   + exception.getMessage());
       }
       finally {
           try {
               /**
                * Close the publisher if instantiated.
                */
               if (publisher != null) {
                   publisher.disconnectClient();
               }
           }
           catch (MqttException exception) {
               System.err.println("Exception occurred closing publisher: "
                       + exception.getMessage());
           }
       }
   }


   /**
    * Construct a new MqttPublisher containing an unconnected MqttClient.
    * 
    * @param brokerURL
    *            Broker URL to (eventually) connect to.
    * @throws MqttException
    *             If an underlying MQTT error occurs instantiating the client
    *             object.
    */
   private MqttPublisher(String brokerURL) throws MqttException {
       this.brokerURL = brokerURL;
       this.client = createClient();
   }
 
 /**
  * Publish a string as a message in byte form with the given quality of
  * service to the given topic.
  */
  public void publishMessage(String topic, byte qos, String message)  
  throws MqttException {
       client.publish(topic, new MqttPayload(message.getBytes(), 0), qos,
               false);
   }

Callbacks

The example client is connected to a broker and capable of publishing messages. Similar to the subscribing clients described in our other article, callbacks provide enhanced functionality for publishers. To receive acknowledgments of publications, a callback handler must be created and registered with the MQTT client object. There are two types of callback handlers: a simple callback handler and an advanced callback handler. They are implemented, respectively, by the MqttCallback interface and MqttAdvancedCallback interface. The MqttAdvancedCallback interface extends the MqttCallback, so when you use the advanced callback interface, you must also implement the methods defined in the simple callback interface. To use the advanced interface you must implement three methods: subscribed(int, byte[]), unsubscribed(int), and published(int) in addition to the inherited methods from the simple callback handler.

The first two callback methods, subscribed(int, byte[]) and unsubscribed(int), are for clients that want to monitor subscription acknowledgments. The subscribed method is called by the client when a subscription request has been acknowledged by the broker. Likewise, the second method, unsubscribed, is invoked when a request to unsubscribe from a topic has been acknowledged. Because this example focuses on publishing, these two methods are not used by the example client; however, skeletal implementations are required for successful code compilation.

The method of most interest to a publishing client is the published(int) method. This method provides notification that a message has been successfully delivered to the broker. The method has one parameter, an integer, messageID. Applications may want to match the messageID with the messageID returned by the publish method. The callback is invoked only for messages published with a QoS value of 1 or 2. The use of client-side persistence provided by an MqttPersistence implementation, coupled with use of QoS 1 or 2, renders the use of this callback redundant for the insurance of message delivery. The MQTT client tracks any publications that were in flight in the event of a connection failure, and it attempts to complete the message delivery upon reconnection. Some applications, though, may find it useful for pacing message delivery or providing their own delivery insurance semantics.

To receive notifications, an implementation of the advanced callback interface must be registered with the MQTT client. The registerCallback method provides registration features for the MQTT client. The code example in listing 3, extended from listing 2, is a fully functional MQTT publisher. The MqttAdvancedCallback interface is implemented by the class and registered with the previously created MQTT object. The class can be started from the command line with a single parameter containing a broker URI, for example, tcp://mybroker:1883.


Conclusion

MQTT is a powerful transport for the publish/subscribe messaging paradigm. It has enhanced utility over other publish/subscribe protocols in situations where a small client and low network overhead are required. This article explained how to create a fully functional MQTT publisher. The example client connects to a broker and publishes messages to a topic. The example also illustrated the use of the MqttAdvancedCallback interface for notification of message delivery to the broker.

Resources

Learn

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into IBM collaboration and social software on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Lotus, WebSphere
ArticleID=242229
ArticleTitle=Using the IBM Lotus Expeditor micro broker MQTT client to publish messages
publish-date=07242007