目次


MQTT と「モノのインターネット」サービスを探る

IBM Watson IoT Platform の Java 用クライアント・ライブラリーを使用した機器サイドのプログラムとアプリケーション・サイドのプログラムを開発する

Comments

Watson IoT Platform のサービスは、世界中の多種多様な機器とアプリケーションを相互に接続する、単純ながらも強力な機能を提供しますが、これを可能にしているのは何なのでしょう?Watson IoT Platform を背後で支えるカギとなっているのは、MQTT (Message Queue Telemetry Transport) という軽量で柔軟なネットワーク・プロトコルです。このチュートリアルでは、MQTT の仕組みと、IoT サービスを利用して簡単にアプリケーションを作成する方法を紹介します。

大まかに言うと、Watson IoT Platform は MQTT ブローカーとして機能することから、接続されたクライアント (機器とアプリケーション) にメッセージを配信する役割を果たします。ここで機器とは情報を検出してそれをパブリッシュするマシンなどのことであり、アプリケーションとはこれらの機器から情報を受信して利用するプログラムのことです。機器とアプリケーションは、図 1 に示すように MQTT プロトコルを使用して MQTT ブローカーと通信します。

図 1. IBM Cloud (旧称 IBM Bluemix) サービスと MQTT クライアント
IBM Cloud IoT サービス (MQTT ブローカー) が機器サイド (MQTT クライアント) でコマンドをサブスクライブし、アプリケーション・サイド (MQTT クライアント) でイベントをサブスクライブする仕組みを示す図
IBM Cloud IoT サービス (MQTT ブローカー) が機器サイド (MQTT クライアント) でコマンドをサブスクライブし、アプリケーション・サイド (MQTT クライアント) でイベントをサブスクライブする仕組みを示す図

Watson IoT Platform を利用するアプリケーションは、一般に以下の 3 つの部分からなります。

  • Watson IoT Platform 構成 (機器およびアプリケーションの登録)
  • 機器サイドのプログラミング
  • アプリケーション・サイドのプログラミング

試してみる用意はできましたか?

アプリケーションを作成するために必要となるもの

1

Watson IoT Platform をセットアップする

このステップでは、機器とアプリケーションを Watson IoT Platform に登録します。機器を接続してデータを使用するには、IBM Cloud アカウントを登録してから、そのアカウントで使用する IBM Cloud 組織内に Watson IoT Platform サービスのインスタンスを作成する必要があります。

IBM Cloud ダッシュボードでアプリケーションを作成する

  1. IBM Cloud にログインします。
  2. 上部のメニューで「CATALOG (カタログ)」をクリックします。
  3. 新規サービスを作成するには、「Platform (プラットフォーム)」カテゴリーの下に分類されている「Internet of Things (モノのインターネット)」をクリックし、「Internet of Things Platform」サービスのタイルをクリックします。
    スクリーンショット
    スクリーンショット
  4. 「Service name (サービス名)」フィールドに、サービスの一意の名前を指定します。このチュートリアルでは「IoT-article-demo」という名前を使用します。
    スクリーンショット
    スクリーンショット
  5. 「CREATE (作成)」をクリックします。

IoT サービス・コンソールを起動する

ダッシュボードから、Watson IoT Platform サービスをセットアップして構成します。

  1. IoT-article-demo サービスのページで、「Launch (起動)」をクリックします。
    スクリーンショット
    スクリーンショット

    ボード・ダッシュボードが開きます。以下のような表示内容になっているはずです。スクリーンショット

    スクリーンショット

  2. 左側にあるメニュー・ペインを使用してサービスにナビゲートします。
    スクリーンショット
    スクリーンショット

注: サービス・コンソールを作成した後は、Watson IoT Platform Web サイトから直接サービス・コンソールにアクセスできるようになります。ログインした後、ウィンドウの右上にあるドロップダウン・メニューを開くことで、作成済みのサービス・コンソールのすべてを確認できます。

スクリーンショット
スクリーンショット

機器を登録する

Watson IoT Platform に接続する機器のそれぞれに、機器のタイプが関連付けられていなければなりません。機器のタイプとは、共通の特性を共有するデバイスのグループを意味します。Watson IoT Platform サービスに初めて機器を追加する際は、「Device type (機器のタイプ)」メニューに機器のタイプは 1 つも表示されません。したがって、最初に機器のタイプを作成する必要があります。

  1. 「Devices (機器)」をクリックし、「Device Types (機器のタイプ)」タブをクリックします。
    スクリーンショット
    スクリーンショット
  2. 「Add Device Type (機器のタイプを追加)」をクリックします。
    スクリーンショット
    スクリーンショット
  3. 「Name (名前)」には、機器の名前として「Java_Client」を使用します。
    注: このフィールドには、任意の名前を指定して構いません。
    スクリーンショット
    スクリーンショット
  4. 「Next (次へ)」「Done (完了)」「Cancel (キャンセル)」の順にクリックします。
  5. 「Browse (参照)」タブをクリックし、「Add Device (機器の追加)」をクリックします。
    スクリーンショット
    スクリーンショット
  6. 「Select Existing Device Type (既存の機器のタイプを選択)」フィールドで、先ほど作成した機器のタイプ「Java_Client」を選択します。
    スクリーンショット
    スクリーンショット
  7. 「Device ID (機器 ID)」フィールドに、機器に固有の ID を入力します (例えば、314159)。独自の ID を入力して構いません。
  8. 「Next (次へ)」をクリックします。
  9. セキュリティー資格情報を自動生成するデフォルト設定を受け入れて「Next (次へ)」をクリックし、「Done (完了)」をクリックします。
    スクリーンショット
    スクリーンショット
  10. 今後の使用に備えて、認証トークンを書き留めるか、テキスト・エディターにコピー・アンド・ペーストします。
    スクリーンショット
    スクリーンショット

機器を Watson IoT Platform に登録した後は、その登録情報を使用して、機器を接続し、機器からのデータを受信し始めることができます。

アプリケーションを登録する

アプリケーションを Watson IoT Platform に接続するには、API キーとトークンを使用して接続するか、IBM Cloud 内の Watson IoT Platform に直接アプリケーションをバインドする必要があります。その後、アクセス権ダッシュボードを使用してアクセス権を付与します。

  1. Watson IoT Platform ダッシュボードの左側にあるメニュー・ペインで「Apps (アプリケーション)」をクリックし、「Generate API Key (API キーの生成)」をクリックします。
    スクリーンショット
    スクリーンショット
  2. 「API Key (API キー)」「Authentication Token (認証トークン)」の値をテキスト・エディターにコピーし、保存します。この情報は後で使用します。
    スクリーンショット
    スクリーンショット
  3. 「Generate (生成)」をクリックします。
    スクリーンショット
    スクリーンショット

必要に応じて、上記のステップを繰り返して他のアプリケーションも登録します。

2

機器サイドのプログラムを作成する

機器サイドのプログラムは、以下の 3 つの部分からなります。

  • IoT サービス (MQTT ブローカー) に接続する部分
  • アプリケーションにイベントをパブリッシュする部分
  • アプリケーションのコマンドをサブスクライブする部分

このセクションでは、Java クライアント・ライブラリーを使用した単純な機器サイドのプログラムを作成します。使用するライブラリーは、このリンク先の ibm-watson-iot GitHub リポジトリー内に保管されています。詳細については、ソース・コードを参照してください。

IoT サービスに接続する

以下に記載するコードに、機器サイドの MQTT プログラムのメイン・スレッドが示されています。このスレッドによって、ダッシュボード内で作成した Watson IoT Platform 機器のうちの 1 つに接続します。このプログラムを実行するには、コマンド・ラインを使用して接続パラメーターを渡します。このプログラムでは、Watson IoT Platform に登録されている所定の機器に対応する複数のインスタンスを実行できるようになっていますが、実際の物理センサー機器では、その機器に固有の SDK を使用して機器固有の MQTT プログラムを実装します。

public static void main(String[] args) throws Exception {
  Properties options = new Properties();
  options.put("org", args[0]);
  options.put("type", args[1]);
  options.put("id", args[2]);
  options.setProperty("auth-method", "token");
  options.setProperty("auth-token", args[3]);

  try {
    myClient = new DeviceClient(options);
  } catch (Exception e) {
    e.printStackTrace();
  }

  // More code to handle commands from apps 
  myClient.connect();

  // More code to send events to apps
}

サンプル機器のコマンド・ライン引数は以下のとおりです。

  • args[0] は、org パラメーターです (7z5mnk)。
  • args[1] は、機器のタイプです (Java_Client)。
  • args[2] は、機器の ID です (314159)。
  • args[3] は、機器のアクセス・トークン (シークレット) です (jiw0cK1j84*e_dDjWd)。

イベントをパブリッシュする

この例での機器の主な役割は、カウンターを起動して、5 秒間隔で現在のカウントをイベントとしてWatson IoT Platform に送信することです。Watson IoT Platform は受信したイベントを、機器のイベントを listen しているすべてのアプリケーションにブロードキャストします。

// More code to send events to apps
Runnable countRunnable = new Runnable() {
  public void run() {
    JsonObject event = new JsonObject();
    event.addProperty("count", count);
    myClient.publishEvent("status", event);
    count++;
  }
};
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(countRunnable, 0, 5, TimeUnit.SECONDS);

コマンドをサブスクライブする

Watson IoT Platform 上で機器に接続すると、その機器は、同じ Watson IoT Platform サービス (つまり、前に作成した IoT-article-demo サービス) に登録されているアプリケーションからコマンドを受信できる状態になります。

次に必要となる作業は、Command Callback インターフェースを実装して、作成した Device Client にコールバックを登録することです。このコールバックの実装では、コマンドをキューにプッシュしてから、別のスレッドを使用してキューからコマンドを読み取って処理します。この設計により、コマンド・ハンドラー・スレッドがブロックされることなく、すべてのコマンドを受信した順に処理できるようにします。

コマンド・ハンドラー・スレッド内で、機器はアプリケーションからのコマンドを解析します。コマンドが restart-counter であれば、機器は自身のカウンターを再起動します。

public static void main(String[] args) throws Exception {
  // ... ...

  // More code to handle commands from apps
  AppCommandHandler handler = new AppCommandHandler();
  myClient.setCommandCallback(handler);
  myClient.connect();

  // Run the event processing thread
  Thread thread = new Thread(handler);
  thread.start();
}

class AppCommandHandler implements CommandCallback, Runnable {

  // A queue to hold and process the Events 
  // for smooth handling of MQTT messages
  // as some commands may take a long time to process
  private BlockingQueue<Command> cmdQueue = new LinkedBlockingQueue<Command>();

  @Override
  public void processCommand(Command c) {
    try {
      cmdQueue.put(c);
    } catch (InterruptedException e1) {
      e1.printStackTrace();
    }
  }

  @Override
  public void run() {
    while(true) {
      Command c = null;
      try {
        c = cmdQueue.take();

        JsonParser jsonParser = new JsonParser();
        JsonElement ele = jsonParser.parse(c.getPayload());

        if ("restart-counter".equalsIgnoreCase(ele.getAsJsonObject().get("name").getAsString())) {
          SampleDevice.count = 0;
        }
      } catch (InterruptedException e1) {
        continue;
      }
    }
  }
}

注: 説明のために、機器サイドのプログラムは Java で作成されていますが、実際にプログラムを実行するには、機器上に完全な JVM が必要です。機器としてラップトップを使用することはできますが、完全に JVM 対応のコンピューターは現実的な IoT 機器とは言えません (私が前に Watson IoT Platform の MQTT を使用して作成した developerWorks チュートリアルでは、現実的な IoT 機器として NodeMCU ボードを使用しました)。センサー機器を内蔵する実際の機器用のプログラムを作成するには、このリンク先のページから入手できる ibm-watson-iot クライアント・ライブラリーのいずれかを使用できます (例えば、Embedded C++)。

3

アプリケーション・サイドのプログラムを作成する

機器サイドのプログラミングと同じく、アプリケーション・サイドのプログラミングは以下の 3 つの部分からなります。

  • IoT サービス (MQTT ブローカー) に接続する部分。
  • 機器または MQTT ブローカーからのイベントにサブスクライブする部分。
  • 機器にコマンドをパブリッシュする部分。

このセクションでは、ibm-watson-iot Java クライアント・ライブラリーを使用した単純なアプリケーション・サイドのプログラムを作成します。詳細については、ソース・コードを参照してください。

IoT サービスに接続する

以下に記載するコードに、MQTT アプリケーションのメイン・スレッドが示されています。このスレッドによって、ダッシュボード内で作成した Watson IoT Platform アプリケーションに接続します。アプリケーションを実行するには、コマンド・ラインを使用して接続パラメーターを渡します。

public static void main(String[] args) throws Exception {
  Properties options = new Properties();
  options.put("org", args[0]);
  options.put("id", args[1]);
  options.put("Authentication-Method","apikey");
  options.put("API-Key", args[2]);
  options.put("Authentication-Token", args[3]);

  ApplicationClient myClient = new ApplicationClient(options);
  myClient.connect();

  // More code to handle events from devices
}

サンプル・プロジェクトのコマンド・ライン引数は以下のとおりです。

  • args[0] は、org パラメーターです (7z5mnk)。
  • args[1] は、アプリケーションの id パラメーターです (dw-sample-app)。
  • args[2] は、アプリケーションの API キーです (a-7z5mnk-3ail3qwogk)。
  • args[3] は、アプリケーションの API トークン (シークレット) です (j-T56vZHOjFdTz3iXQ)。

イベントをサブスクライブする

Watson IoT Platform 上でアプリケーションに接続すると、そのアプリケーションは、同じ Watson IoT Platform サービス (つまり、前に作成した IoT-article-demo サービス) に登録されている機器からイベントを受信できる状態になります。

次に必要となる作業は、EventCallback インターフェースを実装して、作成した ApplicationClient にコールバックを登録することです。このコールバックの実装では、イベントをキューにプッシュしてから、別のスレッドを使用してキューからイベントを読み取ります。この設計により、イベント・ハンドラー・スレッドがブロックされることなく、すべてのイベントを受信した順に処理できるようにします。

イベント・ハンドラー・スレッド内で、アプリケーションは機器イベントに含まれるカウント値を解析します。値が 4 より大きければ、アプリケーションはカウンターを再起動するためのコマンドを機器に送信します。

public static void main(String[] args) throws Exception {
  // ... ...

  ApplicationClient myClient = new ApplicationClient(options);
  myClient.connect();

  DeviceEventHandler handler = new DeviceEventHandler();
  handler.setClient(myClient);

  myClient.setEventCallback(handler);
  myClient.subscribeToDeviceEvents();

  // Run the event processing thread
  Thread thread = new Thread(handler);
  thread.start();
}

class DeviceEventHandler implements EventCallback, Runnable {

  private ApplicationClient client;

  // A queue to hold and process the Events 
  // for smooth handling of MQTT messages
  // as some events may take a long time to process
  private BlockingQueue<Event> evtQueue = new LinkedBlockingQueue<Event>();

  public void processEvent(Event e) {
    try {
      evtQueue.put(e);
    } catch (InterruptedException e1) {
      e1.printStackTrace();
    }

  }

  @Override
  public void processCommand(Command cmd) {
    System.out.println("Command received:: " + cmd);
  }

  @Override
  public void run() {
    while(true) {
      Event e = null;
      try {
        e = evtQueue.take();

        // Check count value
        JsonParser jsonParser = new JsonParser();
        JsonElement ele = jsonParser.parse(e.getPayload());

        int count = ele.getAsJsonObject().get("count").getAsInt();

        if (count >= 4) {
          // Send "restart" command back to this device
          // ... ...
        }
      } catch (InterruptedException e1) {
        continue;
      }
    }
  }

  public ApplicationClient getClient() {
    return client;
  }
  public void setClient(ApplicationClient client) {
    this.client = client;
  }
}

コマンドをパブリッシュする

restart-counter コマンドを機器に送信するには、publishCommand メソッドを使用します。このメソッドに、機器の情報と JSON メッセージを渡します。

if (count >= 4) {
  // Send "restart" command back to this device
  JsonObject data = new JsonObject();
  data.addProperty("name", "restart-counter");
  data.addProperty("time", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
  client.publishCommand(e.getDeviceType(), e.getDeviceId(), "restart", data);
}
4

アプリケーションを実行する

機器サイドのプログラムとアプリケーション・サイドのプログラムは、どちらも Java アプリケーションとして作成されています。サンプル・ソース・コード・プロジェクトは Maven プロジェクトとしてセットアップしてあり、Maven は実行可能 JAR ファイルをビルドするように構成してあります。この方法により、ビルド後のプログラムはコマンド・ラインから直接実行することができます。

プロジェクトをビルドする

プロジェクトをビルドするには、device ディレクトリー内と application ディレクトリー内の両方で以下のコマンドを実行します。

$ mvn clean package

このコマンドにより、ターゲット・ディレクトリー内にそれぞれ application-1.0.jar ファイル、device-1.0.jar ファイルが作成されます。

アプリケーション・サイドのプログラムを実行する

アプリケーション・サイドのプログラムは以下のようにして実行できます。

$ java -jar target/application-1.0.jar 7z5mnk dw-sample-app a-7z5mnk-3ail3qwogk j-T56vZHOjFdTz3iXQ
Nov 05, 2017 2:26:34 AM com.ibm.iotf.client.AbstractClient createClient
INFO: main: Org ID    = 7z5mnk
         Client ID    = a:7z5mnk:dw-sample-app
Nov 05, 2017 2:26:35 AM com.ibm.iotf.client.AbstractClient connect
INFO: main: Initiating Token based authentication
Nov 05, 2017 2:26:35 AM com.ibm.iotf.client.AbstractClient connect
INFO: main: Connecting client a:7z5mnk:dw-sample-app to ssl://7z5mnk.messaging.internetofthings.ibmcloud.com:8883 (attempt #1)...
Nov 05, 2017 2:26:35 AM com.ibm.iotf.client.AbstractClient connect
INFO: main: Successfully connected to the IBM Watson IoT Platform

これで、アプリケーションが機器からのイベントを待機中の状態になりました。

機器サイドのプログラムを実行する

機器サイドのプログラムを起動した後は、機器から Watson IoT Platform に 5 秒間隔でカウント・イベントが送信されるようになります。カウントが 4 に達すると、アプリケーション・サイドのプログラムによって送信される restart-counter コマンドに応答してカウンターが再起動されます。

繰り返しますが、ここで記載している機器サイドのプログラムは、完全な JVM 機器上で実行されるプログラムを説明するためのものです。組み込み機器上でアプリケーションを実行するには、その機器に対応する別の Watson IoT Platform クライアント・ライブラリーを使用する必要があります。

$ java -jar target/device-1.0.jar 7z5mnk Java_Client  314159  jiw0cK1j84*e_dDjWd
Starting IoT device
Nov 05, 2017 2:26:46 AM com.ibm.iotf.client.AbstractClient createClient
INFO: main: Org ID    = 7z5mnk
         Client ID    = d:7z5mnk:Java_Client:314159
Nov 05, 2017 2:26:46 AM com.ibm.iotf.client.AbstractClient connect
INFO: main: Initiating Token based authentication
Nov 05, 2017 2:26:46 AM com.ibm.iotf.client.AbstractClient connect
INFO: main: Connecting client d:7z5mnk:Java_Client:314159 to ssl://7z5mnk.messaging.internetofthings.ibmcloud.com:8883 (attempt #1)...
Nov 05, 2017 2:26:47 AM com.ibm.iotf.client.AbstractClient connect
INFO: main: Successfully connected to the IBM Watson IoT Platform
Posted event {"count":0}
Posted event {"count":1}
Posted event {"count":2}
Posted event {"count":3}
Posted event {"count":4}
Command:: restart:{"name":"restart-counter","time":"2017-11-05 02:27:07"}
Posted event {"count":0}
Posted event {"count":1}
Posted event {"count":2}
Posted event {"count":3}

機器サイドのプログラムの起動後は、アプリケーション・サイドのプログラムのコンソールにも出力が表示されます。

Event received:: Event [2017-11-05T02:26:47.438-06:00] Java_Client:314159 - status: {"count":0}
Event received:: Event [2017-11-05T02:26:52.374-06:00] Java_Client:314159 - status: {"count":1}
Event received:: Event [2017-11-05T02:26:57.377-06:00] Java_Client:314159 - status: {"count":2}
Event received:: Event [2017-11-05T02:27:02.374-06:00] Java_Client:314159 - status: {"count":3}
Event received:: Event [2017-11-05T02:27:07.377-06:00] Java_Client:314159 - status: {"count":4}
Event received:: Event [2017-11-05T02:27:12.376-06:00] Java_Client:314159 - status: {"count":0}
Event received:: Event [2017-11-05T02:27:17.372-06:00] Java_Client:314159 - status: {"count":1}
Event received:: Event [2017-11-05T02:27:22.371-06:00] Java_Client:314159 - status: {"count":2}
Event received:: Event [2017-11-05T02:27:27.375-06:00] Java_Client:314159 - status: {"count":3}

実行中のアプリケーションを確認する

Watson IoT Platform コンソールから個々の機器に関するイベント・ログも確認できるようになっているので、デバッグする際に便利です。

スクリーンショット
スクリーンショット

まとめと次のステップ

上記のステップに従えば、MQTT プロトコルを介して Watson IoT Platform サービスを利用するアプリケーションを作成することができます。MQTT クライアントのプログラミングに慣れているとしたら、Watson IoT Platform サービスを利用する場合にはプロパティー (MQTT 接続オプション、サブジェクトのパブリッシュおよびサブスクリプション・フォームなど) を正しいフォーマットに準拠させる必要があるという点を除き、大きな違いは見つからないはずです。

既存の MQTT クライアント・ライブラリーをベースとしたソリューションを作成する場合は、Watson IoT Platform サービスを利用すると、ソリューションを作成するプロセスが容易になります。幸い、C、C++、Java、JavaScript、Ruby、Go をはじめとする各種プラットフォームを対象とした、多数のオープンソースの MQTT クライアント・ライブラリーがあります。また、このチュートリアルで使用した Java ライブラリーのように、Watson IoT Platform に固有のクライアント・ライブラリーもいくつかあります。

Watson IoT Platform のサービスでは、接続対象の機器として、Arduino Uno や Raspberry Pi などの多種多様なスマート機器をサポートしています。Web ポータルの資料のセクションに、いくつかのサンプル・コード・プロジェクトが用意されているので詳細を調べてください。このチュートリアルで説明した MQTT のメカニズムは、ソース・コードを理解して変更を加えるのに役立つはずです。機器がリストに含まれていないとしても、心配は要りません。このサンプル・アプリケーションで機器サイドのプログラムを Java で実装したように、MQTT クライアント・ライブラリーを使用すれば、独自のプログラムを簡単に作成することができます。

アプリケーション・サイドのプログラムは Java で実装しましたが、Node.js や Ruby などといった別のプログラミング言語を選択することもできます。アプリケーション・サイドのプログラミングを容易にするには、ローカルでコーディングするのではなく、ブラウザー・ベースのフロー・エディターとして Node-RED を使用することができます。自力で作成するか、Node-RED を使用するかどうかに関わらず、Watson IoT Platform サービスでの MQTT フローは同じように機能します。

謝辞

この記事は、Chun Bin Tang が作成した、2015 年 2 月 18 日に公開された記事をベースとしています。


ダウンロード可能なリソース


コメント

コメントを登録するにはサインインあるいは登録してください。

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=Internet of Things
ArticleID=1003672
ArticleTitle=MQTT と「モノのインターネット」サービスを探る
publish-date=05312018