Содержание


Использование MQTT-клиента микроброкера IBM Lotus Expeditor для публикации сообщений

Связующее ПО для обмена сообщениями предоставляет надёжные и гибкие коммуникационные сервисы в контексте решения по бизнес-интеграции. Одна из многих технологий IBM в области связующего ПО для обмена сообщениями называется MQ Telemetry Transport (MQTT). Это протокол, поддерживаемый микроброкером Lotus Expeditor. MQTT представляет собой основанный на TCP/IP протокол обмена сообщениями publish/subscribe (издатель-подписчик), предназначенный для использования в сетях, требующих минимальных накладных расходов. Микроброкер - это маленький брокер сообщений (менее 2 МБ Java-кода), спроектированный для развёртывания на небольших специализированных устройствах, часто удалённых от корпоративного информационного центра. В этой статье мы создадим пример публикатора, способного подключаться к микроброкеру Lotus Expeditor, выполним публикацию по теме и проверим получение брокером этого сообщения. Ознакомившись со статьёй, вы получите все знания, необходимые для создания простого MQTT-публикатора для вашего предприятия.

Об истории, причинах создания, характеристиках протокола и инструкциях по подписке на MQTT-совместимый брокер можно прочитать в другой статье: Использование MQ Telemetry Transport с WebSphere Business Integration, часть 1: Подписка (EN). В нашей сегодняшней статье более подробно разбираются концепции, речь о которых шла в вышеупомянутой статье; кроме того, рассматривается публикация сообщений при помощи MQTT. В предыдущей статье затрагивалась подписка с помощью IBM Websphere Message Broker; а в нынешней статье разбирается публикация сообщений с помощью более нового компонента – микроброкера Lotus Expeditor. Рассматриваемые в данной статье положения относятся также и к WebSphere Message Broker, поскольку оба брокера поддерживают протокол MQTT.

Эта статья представляет собой руководство по написанию MQTT-публикаторов на Java. В примере приложения, подробно рассматриваемом в статье, моделируется публикация сообщений с уведомлениями о прибытии рейсов в аэропорт. Для простоты в данном сценарии мы ограничимся двумя авиакомпаниями (Air Freedom и Northern Air) и двумя аэропортами (Роли-Дарем/RDU и лондонский Хитроу/LHR). Для компиляции примера из данной статьи требуется библиотека MQTT, поставляемая с микроброкером Lotus Expeditor. Кроме того, для подключения и публикации сообщений с помощью примера MQTT-клиента необходим установленный и запущенный микроброкер Lotus Expeditor.

Темы

В обмене сообщениями по модели publish/subscribe адресаты сообщений называются темами (topics). В протоколе MQTT используется иерархическое пространство тем, то есть структура тем может быть такой, чтобы позволять подписчикам и публикаторам указывать тему адресата с разными степенями точности. Правила MQTT, касающиеся пространства тем, немногочисленны, и проектирование логического информационного пространства, подходящего для вашего приложения, возлагается на вас.

Опыт показывает, что более подробное пространство тем предпочтительнее сжатого, менее подробного. Например, использовать в качестве тем a, b или c не рекомендуется. Хотя при использовании коротких тем экономятся сетевые ресурсы, правильно спланированное и более подробное пространство тем, позволяющее в выполняющих подписку приложениях использовать подстановочные знаки, имеет существенные преимущества.

Тема - это произвольная строка, в которую входят любые символы с однобайтной кодировкой. Символы /, + и # имеют, однако, специальное значение, что обсуждается ниже. Длина темы не может превышать 32767 символов.

На рисунке 1 приведён пример логически сконструированного пространства тем для сообщений о прибытии и отправлении рейсов. Это пространство тем используется во всех примерах нашей статьи.

Рисунок 1. Пространство тем
Пространство тем
Пространство тем

Иерархия тем следующая: Flight Times (время прибытия/отправления) - Airport (аэропорт) - Airline (авиакомпания) - Arrivals or Departures (прибытие или отправление) - Flight Number (номер рейса). При конвертировании иерархии тем в строку используется специальный символ "косая черта" (/). Он разграничивает иерархию тем на отдельные разделы, добавляя дополнительный логический уровень точности. Например, чтобы опубликовать сообщение о рейсе авиакомпании Air Freedom № 1326, прибывающем в аэропорт RDU, тема сообщения должна выглядеть следующим образом: Flight Times/RDU/Air Freedom/Arrivals/Flight 1326. Это логически сконструированное пространство тем позволяет различным группам подписчиков создавать подписки, доставляющие лишь интересующие их сообщения. Неправильно спланированное пространство тем часто приводит к избыточности подписки для клиента, когда клиент подписан на более широкое пространство тем, чем необходимо, и получает, таким образом, не нужные ему сообщения. Для таких сообщений требуется их дополнительная фильтрация клиентским приложением с целью определить нужные сообщения. Избыточная подписка приводит к лишней нагрузке на канал и снижает эффективность клиента.

В следующих разделах описываются три чётко выраженные группы подписчиков, интересующихся временем прибытия и отправления рейсов во всём мире:

  • Человек, встречающий рейс в аэропорту
  • Аэропорт, выводящий на табло информацию о рейсах
  • Авиакомпания, отслеживающая процент рейсов, прибывших по графику

В первом сценарии человек, на телефоне которого установлено клиентское MQTT-приложение, встречает кого-то в аэропорту. В этом случае этот человек интересуется лишь одним рейсом (Air Freedom № 1024) в конкретном аэропорту (LHR). Для получения уведомления о прибытии этого рейса клиентское приложение подписывается на Flight Times/LHR/Air Freedom/Arrivals/Flight 1024.

Во втором сценарии аэропорт Хитроу (LHR) отображает для посетителей аэропорта информацию о прибытии и отправлении рейсов. В этом сценарии клиента интересуют только прибытия и отправления в аэропорту LHR. Логическое пространство тем позволяет одной подписке запрашивать доставку любой информации о прибытиях и отправлениях в/из LHR. Необходимая строка темы подписки - Flight Times/LHR/#. Символ # - это специальный подстановочный знак, соответствующий всем темам в пространстве тем справа от себя. Его можно рассматривать как символ подписки на поддерево пространства тем.

В последнем сценарии авиакомпания собирает статистику прибытий по графику. В нашем примере авиакомпании Northern Air требуется процент прибытий по графику по всему миру. Поэтому компании Northern Air требуется единая подписка на время прибытия рейсов во всём мире. В этом примере Northern Air интересуют только прибытия, а не отправления. Строка темы, запрашивающая интересующую в данном случае компанию Northern Air информацию, - Flight Times/+/Northern Air/Arrivals/#. В этой строке темы используется специальный подстановочный символ +, позволяющий Northern Air подписаться на прибытия во все аэропорты. В отличие от символа # он соответствует всего одному уровню иерархии пространства тем, не затрагивая более низкие уровни иерархии, как символ #.

При создания MQTT-приложения для создания логичной и гибкой системы подписки необходимо тщательно спланировать пространство тем. Такой подход гарантирует, что большинству потребителей публикаций не потребуется множество подписок или избыточная подписка.

Подключение MQTT

Для публикации сообщений при помощи MQ Telemetry Transport требуется соединение c микроброкером Lotus Expeditor или другим сервером обмена сообщениями, поддерживающим протокол MQTT, например, WebSphere Message Broker. Для создания подключения к брокеру необходимо выполнить несколько шагов. Создаётся объект свойств MQTT, который затем передаётся фабрике создания клиента. Этот объект свойств предоставляет конфигурацию созданному экземпляру клиента. Одно из этих свойств - булев флаг (Boolean flag), указывающий, является ли клиентское приложение "клиентом без сохранения сеанса". Если значение флага - true, при каждом подключении клиент не будет знать о предыдущем соединении с брокером (например, о любых ранее сделанных подписках или об ожидающих доставки сообщениях). Если значение флага - false, состояние клиента при различных подключениях к брокеру остаётся прежним; например, клиентскому приложению не потребуется повторная подписка при каждом последующем переподключении. Кроме того, в этом случае клиент и брокер пытаются возобновить любой выполняющийся в данный момент обмен сообщениями (в зависимости от определённого для сообщений качества сервиса), прерванный при разрыве соединения. Для использования "клиента с сохранением сеанса" необходимо обеспечить реализацию интерфейса MqttPersistence. Включение реализации этого интерфейса обозначает для фабрики создания клиента, что клиентскому приложению требуется персистентная (надёжная) доставка сообщений. В нашем примере используется "клиент без сохранения сеанса" и предполагается, что сеть достаточно надёжна. После настройки свойств мы получаем экземпляр MQTT-клиента из фабрики MQTT-клиента. Для создания экземпляра MQTT-клиента требуется несколько параметров, в том числе уникальный идентификатор клиента (client ID), IP-адрес и порт брокера, а также дополнительный объект MqttProperties, описанный ранее.

Идентификатор клиента идентифицирует каждого клиента брокеру. В первую очередь это гарантирует надёжную передачу сообщений и поддерживает состояние подписки при множественных подключениях и отключениях клиента. Важно заметить, что идентификатор каждого клиента, подключающегося в брокеру, должен быть отличен от других. Если два клиента попытаются выполнить подключение к брокеру с помощью одного и того же идентификатора клиента, предпочтение будет отдано последнему соединению, а более раннее будет принудительно отключено. Это сделано для того, чтобы обеспечить повторное подключение клиента к предыдущему соединению, которое не было полностью очищено. Идентификатор клиента может иметь длину до 23 символов. См. листинг 1.

Листинг 1. Подключение
    /**
    * При необходимости создаём объект MqttClient
    * после настройки объекта MqttProperties.
    */
   private MqttClient createClient() throws MqttException {

       MqttProperties mqttProps = new MqttProperties();
       // Не хранящий состояние клиент "без сохранения сеанса"
       mqttProps.setCleanStart(true);

       /**
        * Создаём клиент из фабрики. Идентификатор клиента для этого
        * клиента - "testClient", а URL-адрес во втором параметре описывает
        * местонахождение брокера, в данном случае, на локальном компьютере
        */
       MqttClient mqttClient = MqttClientFactory.INSTANCE.createMqttClient(
               "testClient", "tcp://mybroker:1883", mqttProps);

       return mqttClient;
   }


   /**
    * Подключаем MqttClient к брокеру.
    * 
    * @throws MqttException
    *             При ошибке во время операции подключения.
    */
   private void connect() throws MqttException {

       /**
        * Регистрируем это приложение для обратного вызова из клиента
        */
       client.registerCallback(this);

       /**
        * Подключаем клиент к брокеру.
        */
       client.connect();

   }

Публикация

После успешного подключения MQTT можно публиковать сообщения. Приложения выполняют публикацию через объект MQTT-клиента. Сигнатура метода для публикации сообщения - int publish(String, MqttPayload, byte, Boolean). Ниже подробно рассматриваются эти четыре параметра:

  • String. Тип параметра темы - строковой (string), и эта строка используется брокером для сопоставления публикации и интересов подписчиков (указываемых при помощи описанного ранее синтаксиса подписки на темы).
  • MqttPayload. Второй параметр - это объект MqttPayload. Объект MqttPayload содержит и данные приложения, и любые заголовки протокола для этой публикации. Предусмотрен сдвиг (offset), чтобы приложения могли определить, где в MqttPayload начинаются данные. Это обеспечивает доступ к нижележащему байтовому массиву без создания дополнительных копий после записи данных в сети. Кроме того, обеспечивается доступ для прямого манипулирования полезными данными после создания объекта и до передачи.
  • Byte. Третий параметр, byte, - это качество сервиса (Quality of Service, QoS) для этой публикации. У QoS три допустимых значения: 0, 1 или 2:

    • QoS, равное 0, означает, что публикатор и брокер пытаются выполнить однократную доставку сообщения, но не предпринимают шаги, кроме предусмотренных TCP/IP, чтобы убедиться в том, что сообщение доставлено. Этот уровень иногда называют "выстрелил и забыл", так как сообщение отправляется адресату без проверки получения.
    • При QoS, равном 1, доставка сообщения брокеру проверяется; однако его разрешается доставлять более одного раза.
    • Значение QoS, равное 2, приказывает MQTT доставлять сообщения один и только один раз.
    Каждое увеличение уровня QoS приводит к дополнительной нагрузке на процессор и сеть. Выбор QoS может повлиять на общие возможности масштабирования вашего решения по обмену сообщениями, а также создаёт дополнительную нагрузку на клиент, связанную с хранением недоставленных сообщений. Следовательно, нужно правильно выбрать соответствующий уровень QoS для каждого опубликованного сообщения. Как правило, следует использовать более низкие значения QoS, если только не требуется более жёсткая гарантия доставки сообщений. Значение QoS, указанное для сообщения, определяет качество сервиса для публикации между клиентом и брокером. Кроме того, это значение устанавливает максимальный уровень QoS, который брокер использует для доставки этого сообщения своим подписчикам.

    Подписчики могут указать максимальное значение QoS для доставки сообщений на основе темы, поэтому сообщение, опубликованное с уровнем QoS 2, может быть и не доставлено подписчикам. Подписчик может запросить пониженный уровень QoS для получаемых им сообщений. Хотя может показаться странным, что публикатор не полностью управляет качеством сервиса своих сообщений, в результате увеличивается гибкость для потребителей сообщений. Когда опубликованное сообщение отправляется подписчику, брокер доставляет публикацию либо с максимальным значением QoS, указанным подписчиком во время процесса подписки, либо со значением QoS опубликованного сообщения, если оно ниже. Например, сообщение, опубликованное с QoS = 2 для подписчика, указавшего QoS = 1 для данной темы, доставляется с QoS = 1. Сообщение с QoS = 0, опубликованное для того же подписчика по этой же теме, отправляется подписчику с QoS = 0.
  • Boolean. Четвёртый параметр, булев флаг (Boolean flag), определяет, является ли эта публикация задержанной. Задержанная публикация удерживается в брокере как последнее полученное сообщение по данной теме. Задержанные публикации позволяют следующим подписчикам получать самые последние сообщения по какой-либо теме, как только они на неё подпишутся, даже если подключение произойдёт после того, как сообщение будет опубликовано. Эта возможность очень полезна для наполнения данными отображающего информацию приложения сразу после его запуска и последующего обновления этой информации при её изменении. Если значение этого флага - false, сообщение получат только подписчики, в данный момент подписанные на эту тему. В примере, приведённом в листинге 2, используется незадержанная публикация.

Метод публикации возвращает целочисленный идентификатор сообщения. Его можно использовать вместе с зарегистрированным методом MqttAdvancedCallback, чтобы определить время получения сообщения брокером.

Код в листинге 2 публикует сообщение, означающее, что рейс Air Freedom № 1024 прибыл в лондонский аэропорт Хитроу (LHR).

Листинг 2. Публикация
    /**
    * Вызываем из командной строки URI брокера с одним параметром,
    * например, tcp://mybroker:1883.
    */
   public static void main(String args[]) {

       MqttPublisher publisher = null;

       try {

           publisher = new MqttPublisher(args[0]);

           /**
            * Подключаем созданный публикатор к заданному брокеру.
            */
           publisher.connect();

           /**
            * Публикуем сообщение "Arrival".
            */
                
           publisher.publishMessage(
                   "Flight Times/LHR/Air Freedom/Arrivals/Flight 1024",
                   (byte) 2, "Arrived");

           /**
            * Одну секунду ждём получения уведомления о публикации.
            * В реальных приложениях должны использоваться соответствующие
            * межпотоковые сигнальные механизмы, например, wait/notify, 
            * циклические барьеры или регистры-защёлки.
            */
           Thread.sleep(1000);

       }
       catch (MqttException exception) {
           System.err.println("Exception occurred during either instantiation, 
           connection, or publication: "
                   + exception.getMessage());
       }
       catch (InterruptedException exception) {
           System.err.println("Interrupted while waiting for publication: "
                   + exception.getMessage());
       }
       finally {
           try {
               /**
                * Закрываем публикатор, если экземпляр создан.
                */
               if (publisher != null) {
                   publisher.disconnectClient();
               }
           }
           catch (MqttException exception) {
               System.err.println("Exception occurred closing publisher: "
                       + exception.getMessage());
           }
       }
   }


   /**
    * Конструируем новый MqttPublisher, содержащий неподключенный MqttClient.
    * 
    * @param brokerURL
    *            URL брокера для подключения (в итоге).
    * @throws MqttException
    *             При ошибке MQTT создаётся объект
    *             клиента.
    */
   private MqttPublisher(String brokerURL) throws MqttException {
       this.brokerURL = brokerURL;
       this.client = createClient();
   }
 
 /**
  * Публикуем строку как сообщение в байтовой форме, с заданным
  * качеством сервиса по данной теме.
  */
  public void publishMessage(String topic, byte qos, String message)  
  throws MqttException {
       client.publish(topic, new MqttPayload(message.getBytes(), 0), qos,
               false);
   }

Обратные вызовы

Наш пример клиента подключен к брокеру и может публиковать сообщения. Аналогично клиентам подписки, описанным в другой нашей статье, обратные вызовы предоставляют публикаторам расширенную функциональность. Для получения подтверждения подписки необходимо создать обработчик обратных вызовов и зарегистрировать его при помощи объекта MQTT-клиента. Существует два типа обработчиков обратных вызовов: простой и продвинутый. Они реализуются при помощи интерфейсов MqttCallback и MqttAdvancedCallback, соответственно. Интерфейс MqttAdvancedCallback расширяет MqttCallback, поэтому при использовании продвинутого интерфейса обратных вызовов необходимо также реализовать методы, определённые в простом интерфейсе. Для использования продвинутого интерфейса кроме методов, унаследованных от простого обработчика обратных вызовов, нужно реализовать ещё три метода: subscribed(int, byte[]), unsubscribed(int) и published(int).

Первые два метода обратного вызова, subscribed(int, byte[]) и unsubscribed(int), предназначены для клиентов, которые хотят отслеживать подтверждения подписки. Метод subscribed вызывается клиентом, когда запрос на подписку был подтверждён брокером. Подобным же образом второй метод, unsubscribed, вызывается при подтверждении запроса на отказ от подписки на тему. Поскольку в нашем примере упор делается на публикацию, два этих метода в примере клиента не используются; однако для успешной компиляции кода данная реализация потребуется как основа.

Наиболее интересный метод для клиента публикации - метод published(int). Данный метод обеспечивает уведомления об успешной доставке сообщения брокеру. У метода имеется один целочисленный параметр, messageID. Приложениям может потребоваться соотнести messageID с messageID, возвращённым методом publish. Обратный вызов инициируется только для сообщений, опубликованных со значением QoS 1 или 2. Персистентность (долговременное хранение) на стороне клиента, предоставляемое реализацией MqttPersistence, в сочетании с использованием QoS = 1 или 2, делает применение этого обратного вызова для гарантирования доставки сообщения избыточным. На случай прерывания соединения MQTT-клиент отслеживает любые выполняющиеся публикации и пытается завершить доставку сообщения после повторного подключения. Тем не менее в некоторых приложениях может оказаться полезной синхронизации доставки сообщений или создание собственной семантики проверки их доставки.

Для получения уведомлений реализация продвинутого интерфейса обратных вызовов должна быть зарегистрирована при помощи MQTT-клиента. Метод registerCallback обеспечивает функции регистрации для MQTT-клиента. Пример кода, приведённый в листинге 3, является расширением листинга 2 и представляет собой полнофункциональный MQTT-публикатор. Интерфейс MqttAdvancedCallback реализуется классом и регистрируется с помощью ранее созданного MQTT-объекта. Этот класс можно запустить из командной строки с одним параметром, содержащим URI брокера, например, tcp://mybroker:1883.

Листинг 3. Готовое приложение с обратными вызовами
import com.ibm.mqttclient.MqttAdvancedCallback;
import com.ibm.mqttclient.MqttClient;
import com.ibm.mqttclient.MqttException;
import com.ibm.mqttclient.factory.MqttClientFactory;
import com.ibm.mqttclient.factory.MqttProperties;
import com.ibm.mqttclient.utils.MqttPayload;

/**
* Пример MqttPublisher для статьи DeveloperWorks.
* 
*/
public class MqttPublisher implements MqttAdvancedCallback {

   /**
    * URL-адрес для Mqtt-совместимого брокера
    * 
    */
   private final String brokerURL;

   /**
    * MQTT-клиент.
    */
   private final MqttClient client;


   /**
    * Вызываем из командной строки URI брокера с одним параметром,
    * например, tcp://mybroker:1883.
    */
   public static void main(String args[]) {

       MqttPublisher publisher = null;

       try {

           publisher = new MqttPublisher(args[0]);

           /**
            * Подключаем созданный публикатор к заданному брокеру.
            */
           publisher.connect();

           /**
            * Публикуем сообщение "Arrival".
            */
           publisher.publishMessage(
                   "Flight Times/LHR/Air Freedom/Arrivals/Flight 1024",
                   (byte) 2, "Arrived");

           /**
            * Одну секунду ждём получения уведомления о публикации.
            * В реальных приложениях должны использоваться соответствующие
            * межпотоковые сигнальные механизмы, например, wait/notify, 
            * циклические барьеры или регистры-защёлки.
            */
           Thread.sleep(1000);

       }
       catch (MqttException exception) {
           System.err.println("Exception occurred during either 
           instantiation, connection, or publication: "
                   + exception.getMessage());
       }
       catch (InterruptedException exception) {
           System.err.println("Interrupted while waiting for publication: "
                   + exception.getMessage());
       }
       finally {
           try {
               /**
                * Закрываем публикатор, если экземпляр создан.
                */
               if (publisher != null) {
                   publisher.disconnectClient();
               }
           }
           catch (MqttException exception) {
               System.err.println("Exception occurred closing publisher: "
                       + exception.getMessage());
           }
       }
   }


   /**
    * Конструируем новый MqttPublisher, содержащий неподключенный MqttClient.
    * 
    * @param brokerURL
    *            URL брокера для подключения (в итоге).
    * @throws MqttException
    *             При ошибке MQTT создаётся объект
    *             клиента.
    */
   private MqttPublisher(String brokerURL) throws MqttException {
       this.brokerURL = brokerURL;
       this.client = createClient();
   }


   /**
    * При необходимости создаём объект MqttClient
    * после настройки объекта MqttProperties.
    */
   private MqttClient createClient() throws MqttException {

       MqttProperties mqttProps = new MqttProperties();
       // Не хранящий состояние клиент
       mqttProps.setCleanStart(true);

       /**
        * Создаём клиент из фабрики. Идентификатор клиента для этого
        * клиента - "testClient", а URL-адрес во втором параметре описывает
        * местонахождение брокера, в данном случае, на локальном компьютере
        */
       MqttClient mqttClient = MqttClientFactory.INSTANCE.createMqttClient(
               "testClient", brokerURL, mqttProps);

       return mqttClient;
   }


   /**
    * Подключаем MqttClient к брокеру.
    * 
    * @throws MqttException
    *             При ошибке во время операции подключения.
    */
   private void connect() throws MqttException {

       /**
        * Регистрируем это приложение для обратного вызова из клиента
        */
       client.registerCallback(this);

       /**
        * Подключаем клиент к брокеру.
        */
       client.connect();

   }


   /**
    * Отключаем клиент.
    */
   public void disconnectClient() throws MqttException {
       if (client != null) {
           client.disconnect();
       }
   }


   /**
  * Публикуем строку как сообщение в байтовой форме, с заданным
  * качеством сервиса по данной теме.
    */
   public void publishMessage(String topic, byte qos, String message)
           throws MqttException {
       client.publish(topic, new MqttPayload(message.getBytes(), 0), qos,
               false);
   }


   /**
    * Метод обратного вызова, определённый в интерфейсе MqttAdvancedCallback.
    Этот метод указывает приложению, что публикация, отправленная 
    * с данным message ID, была успешно опубликована брокером.
    Имейте в виду, что этот метод вызывается только для
    * публикаций с QoS больше нуля.
    */
   public void published(int msgId) {
       System.out.println("Message with ID " + msgId
               + " published successfully.");

   }


   /**
    * Метод обратного вызова, определённый в интерфейсе MqttAdvancedCallback.
    Этот метод указывает приложению, что публикация, выполненная
    * клиентом с помощью subscribe(), была успешной. Байтовый массив
    упорядочивается в той же последовательности, как и запрошенный
    * массив QoS, переданный вызову subscribe, и обозначает соответствующий QoS,
    с который рассматриваемая подписка была принята брокером.
    */
   public void subscribed(int msgId, byte[] grantedQoS) {
       // В этом примере не требуется
   }


   /**
    * Метод обратного вызова, определённый в интерфейсе MqttAdvancedCallback.
    Этот метод указывает приложению, что запрос на отписку от темы,
    * выполненный клиентом с помощью unsubscribe(), был успешным.
    */
   public void unsubscribed(int msgId) {
       // В этом примере не требуется
   }


   /**
    * Метод обратного вызова, определённый в интерфейсе MqttAdvancedCallback.
    Сообщает клиентскому приложению о разрыве соединения с брокером и
    * указывает его причину для облегчения диагностики.
    */
   public void connectionLost(Throwable cause) {
       // В этом примере не требуется
   }


   /**
    * Метод обратного вызова, определённый в интерфейсе MqttAdvancedCallback. 
    этот метод вызывается после отправки брокером публикации клиенту.
    * Параметры: данные сообщения в виде исходной темы,
    полезная информация в байтах, качество сервиса сообщения,
    * задержанное сообщение или нет, а также уникальный message ID.
    */
   public boolean publishArrived(String topicName, MqttPayload payload,
           byte qos, boolean retained, int msgId) {
       return true;
   }

}

Заключение

MQTT - это мощный транспортный протокол модели обмена сообщениями publish/subscribe. Он полезнее других аналогичных протоколов в тех ситуациях, когда требуются маленький клиент и невысокая нагрузка на сеть. В данной статье рассматривается процесс создания полнофункционального MQTT-публикатора. Приведенный пример клиента подключается к брокеру и публикует сообщения по определённой теме. В примере также показано применение интерфейса MqttAdvancedCallback для генерации уведомлений о доставке сообщений брокеру.


Ресурсы для скачивания


Похожие темы

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Lotus, WebSphere
ArticleID=290212
ArticleTitle=Использование MQTT-клиента микроброкера IBM Lotus Expeditor для публикации сообщений
publish-date=02192008