Skip to main content

By clicking Submit, you agree to the developerWorks terms of use.

The first time you sign into developerWorks, a profile is created for you. Select information in your developerWorks profile is displayed to the public, but you may edit the information at any time. Your first name, last name (unless you choose to hide them), and display name will accompany the content that you post.

All information submitted is secure.

  • Close [x]

The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerworks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

By clicking Submit, you agree to the developerWorks terms of use.

All information submitted is secure.

  • Close [x]

Using the IBM Lotus Expeditor micro broker MQTT client to publish messages


Return to article



Listing 3. Complete application with callbacks

import com.ibm.mqttclient.MqttAdvancedCallback;
import com.ibm.mqttclient.MqttClient;
import com.ibm.mqttclient.MqttException;
import com.ibm.mqttclient.factory.MqttClientFactory;
import com.ibm.mqttclient.factory.MqttProperties;
import com.ibm.mqttclient.utils.MqttPayload;

/**
* Example MqttPublisher for DeveloperWorks article.
* 
*/
public class MqttPublisher implements MqttAdvancedCallback {

   /**
    * URL for the Mqtt compliant broker
    * 
    */
   private final String brokerURL;

   /**
    * MQTT Client.
    */
   private final MqttClient client;


   /**
    * Invoke from the command line with a single parameter, the broker URI,
    * e.g. tcp://mybroker:1883.
    */
   public static void main(String args[]) {

       MqttPublisher publisher = null;

       try {

           publisher = new MqttPublisher(args[0]);

           /**
            * Connect the newly created publisher to the supplied broker.
            */
           publisher.connect();

           /**
            * Publish an "Arrival" message.
            */
           publisher.publishMessage(
                   "Flight Times/LHR/Air Freedom/Arrivals/Flight 1024",
                   (byte) 2, "Arrived");

           /**
            * Sleep for 1 second waiting to receive notification of
            * publication. Real applications should use appropriate
            * inter-thread signaling methodologies such as wait/notify, cyclic
            * barriers and latches.
            */
           Thread.sleep(1000);

       }
       catch (MqttException exception) {
           System.err.println("Exception occurred during either 
           instantiation, connection, or publication: "
                   + exception.getMessage());
       }
       catch (InterruptedException exception) {
           System.err.println("Interrupted while waiting for publication: "
                   + exception.getMessage());
       }
       finally {
           try {
               /**
                * Close the publisher if instantiated.
                */
               if (publisher != null) {
                   publisher.disconnectClient();
               }
           }
           catch (MqttException exception) {
               System.err.println("Exception occurred closing publisher: "
                       + exception.getMessage());
           }
       }
   }


   /**
    * Construct a new MqttPublisher containing an unconnected MqttClient.
    * 
    * @param brokerURL
    *            Broker URL to (eventually) connect to.
    * @throws MqttException
    *             If an underlying MQTT error occurs instantiating the client
    *             object.
    */
   private MqttPublisher(String brokerURL) throws MqttException {
       this.brokerURL = brokerURL;
       this.client = createClient();
   }


   /**
    * Create a MqttClient object after configuring the MqttProperties object as
    * required.
    */
   private MqttClient createClient() throws MqttException {

       MqttProperties mqttProps = new MqttProperties();
       // Stateless client
       mqttProps.setCleanStart(true);

       /**
        * Create the client from the factory. The client ID for this client is
        * "testClient" and the URL in the second parameter describes the
        * location of the remote broker.
        */
       MqttClient mqttClient = MqttClientFactory.INSTANCE.createMqttClient(
               "testClient", brokerURL, mqttProps);

       return mqttClient;
   }


   /**
    * Connect the MqttClient to a broker.
    * 
    * @throws MqttException
    *             If an error occurs during connection operations.
    */
   private void connect() throws MqttException {

       /**
        * Register this application for callbacks from the client
        */
       client.registerCallback(this);

       /**
        * Connect the client to a broker.
        */
       client.connect();

   }


   /**
    * Disconnect the client.
    */
   public void disconnectClient() throws MqttException {
       if (client != null) {
           client.disconnect();
       }
   }


   /**
    * Publish a string as a message in byte form with the given quality of
    * service to the given topic.
    */
   public void publishMessage(String topic, byte qos, String message)
           throws MqttException {
       client.publish(topic, new MqttPayload(message.getBytes(), 0), qos,
               false);
   }


   /**
    * Callback method defined in the MqttAdvancedCallback interface. This method 
    indicates to the application that a publication 
    * sent with the given message ID was published successfully by the 
    broker. Note that this will only be called for publications
    * with a QoS greater than zero.
    */
   public void published(int msgId) {
       System.out.println("Message with ID " + msgId
               + " published successfully.");

   }


   /**
    * Callback method defined in the MqttAdvancedCallback interface. This method i
    ndicates to the application that a subscription
    * made by the client using subscribe() was a success. The byte array is 
    sequenced in the same order as the requested QoS array passed
    * to the call to subscribe and indicates the corresponding QoS with 
    which the subscription in question was lodged with the broker.
    */
   public void subscribed(int msgId, byte[] grantedQoS) {
       // Not required for this example
   }


   /**
    * Callback method defined in the MqttAdvancedCallback interface. This method 
    indicates to the application that a request to un-subscribe from a topic
    * made by the client using unsubscribe() was a success.
    */
   public void unsubscribed(int msgId) {
       // Not required for this example
   }


   /**
    * Callback method defined in the MqttAdvancedCallback interface. Indicates to the 
    client application that the connection to the broker was lost,
    * providing the root cause to aid diagnosis.
    */
   public void connectionLost(Throwable cause) {
       // Not required for this example
   }


   /**
    * Callback method defined in the MqttAdvancedCallback interface. This is the callback 
    method that is called when a publication has been sent to the
    * client from the broker. The parameters contain the message data in the form 
    of the source topic, the byte payload, the quality of service of the 
    * message, whether the message was retained, and a unique message ID.
    */
   public boolean publishArrived(String topicName, MqttPayload payload,
           byte qos, boolean retained, int msgId) {
       return true;
   }

} 


Return to article