Listing 3. Complete application with callbacks
import com.ibm.mqttclient.MqttAdvancedCallback;
import com.ibm.mqttclient.MqttClient;
import com.ibm.mqttclient.MqttException;
import com.ibm.mqttclient.factory.MqttClientFactory;
import com.ibm.mqttclient.factory.MqttProperties;
import com.ibm.mqttclient.utils.MqttPayload;
/**
* Example MqttPublisher for DeveloperWorks article.
*
*/
public class MqttPublisher implements MqttAdvancedCallback {
/**
* URL for the Mqtt compliant broker
*
*/
private final String brokerURL;
/**
* MQTT Client.
*/
private final MqttClient client;
/**
* Invoke from the command line with a single parameter, the broker URI,
* e.g. tcp://mybroker:1883.
*/
public static void main(String args[]) {
MqttPublisher publisher = null;
try {
publisher = new MqttPublisher(args[0]);
/**
* Connect the newly created publisher to the supplied broker.
*/
publisher.connect();
/**
* Publish an "Arrival" message.
*/
publisher.publishMessage(
"Flight Times/LHR/Air Freedom/Arrivals/Flight 1024",
(byte) 2, "Arrived");
/**
* Sleep for 1 second waiting to receive notification of
* publication. Real applications should use appropriate
* inter-thread signaling methodologies such as wait/notify, cyclic
* barriers and latches.
*/
Thread.sleep(1000);
}
catch (MqttException exception) {
System.err.println("Exception occurred during either
instantiation, connection, or publication: "
+ exception.getMessage());
}
catch (InterruptedException exception) {
System.err.println("Interrupted while waiting for publication: "
+ exception.getMessage());
}
finally {
try {
/**
* Close the publisher if instantiated.
*/
if (publisher != null) {
publisher.disconnectClient();
}
}
catch (MqttException exception) {
System.err.println("Exception occurred closing publisher: "
+ exception.getMessage());
}
}
}
/**
* Construct a new MqttPublisher containing an unconnected MqttClient.
*
* @param brokerURL
* Broker URL to (eventually) connect to.
* @throws MqttException
* If an underlying MQTT error occurs instantiating the client
* object.
*/
private MqttPublisher(String brokerURL) throws MqttException {
this.brokerURL = brokerURL;
this.client = createClient();
}
/**
* Create a MqttClient object after configuring the MqttProperties object as
* required.
*/
private MqttClient createClient() throws MqttException {
MqttProperties mqttProps = new MqttProperties();
// Stateless client
mqttProps.setCleanStart(true);
/**
* Create the client from the factory. The client ID for this client is
* "testClient" and the URL in the second parameter describes the
* location of the remote broker.
*/
MqttClient mqttClient = MqttClientFactory.INSTANCE.createMqttClient(
"testClient", brokerURL, mqttProps);
return mqttClient;
}
/**
* Connect the MqttClient to a broker.
*
* @throws MqttException
* If an error occurs during connection operations.
*/
private void connect() throws MqttException {
/**
* Register this application for callbacks from the client
*/
client.registerCallback(this);
/**
* Connect the client to a broker.
*/
client.connect();
}
/**
* Disconnect the client.
*/
public void disconnectClient() throws MqttException {
if (client != null) {
client.disconnect();
}
}
/**
* Publish a string as a message in byte form with the given quality of
* service to the given topic.
*/
public void publishMessage(String topic, byte qos, String message)
throws MqttException {
client.publish(topic, new MqttPayload(message.getBytes(), 0), qos,
false);
}
/**
* Callback method defined in the MqttAdvancedCallback interface. This method
indicates to the application that a publication
* sent with the given message ID was published successfully by the
broker. Note that this will only be called for publications
* with a QoS greater than zero.
*/
public void published(int msgId) {
System.out.println("Message with ID " + msgId
+ " published successfully.");
}
/**
* Callback method defined in the MqttAdvancedCallback interface. This method i
ndicates to the application that a subscription
* made by the client using subscribe() was a success. The byte array is
sequenced in the same order as the requested QoS array passed
* to the call to subscribe and indicates the corresponding QoS with
which the subscription in question was lodged with the broker.
*/
public void subscribed(int msgId, byte[] grantedQoS) {
// Not required for this example
}
/**
* Callback method defined in the MqttAdvancedCallback interface. This method
indicates to the application that a request to un-subscribe from a topic
* made by the client using unsubscribe() was a success.
*/
public void unsubscribed(int msgId) {
// Not required for this example
}
/**
* Callback method defined in the MqttAdvancedCallback interface. Indicates to the
client application that the connection to the broker was lost,
* providing the root cause to aid diagnosis.
*/
public void connectionLost(Throwable cause) {
// Not required for this example
}
/**
* Callback method defined in the MqttAdvancedCallback interface. This is the callback
method that is called when a publication has been sent to the
* client from the broker. The parameters contain the message data in the form
of the source topic, the byte payload, the quality of service of the
* message, whether the message was retained, and a unique message ID.
*/
public boolean publishArrived(String topicName, MqttPayload payload,
byte qos, boolean retained, int msgId) {
return true;
}
}
|