目次


IBM InfoSphere Streams 3.2ハンズオン・ラボ

入門編【ラボ4】

Comments

コンテンツシリーズ

このコンテンツは全#シリーズのパート#です: IBM InfoSphere Streams 3.2ハンズオン・ラボ

このシリーズの続きに乞うご期待。

このコンテンツはシリーズの一部分です:IBM InfoSphere Streams 3.2ハンズオン・ラボ

このシリーズの続きに乞うご期待。

ラボ4 エクスポートしたストリームによるアプリケーションのモジュール設計

このラボでは、オペレーターを追加することで、これまで構築してきたアプリケーションを強化します。これにより、追跡している各車両別に、5 つの観測ごとの平均速度を計算します。その後、Streams Console を使用し、結果を視覚化します。

これまでに使用してきたオペレーターは、各タプルを個別に見ており、履歴を取る必要はありませんでした。しかし、多くの分析処理においては、求められる結果を計算するのに、ある程度の履歴を記憶しなければなりません。ストリーム処理においては、「データセット全体」と呼べるものは存在しませんが、連続するタプルの限られたシーケンスを持つバッファーを定義することはできます。これにより、例えば、1 つ以上の数値属性を持つタプルの制限されたサブセットに対し、平均を計算することができます。このようなバッファーは、ウィンドウと呼ばれます。このラボでは、Aggregate オペレーターを使用して、まさにそうした平均を計算していきます。

4.1 これまでの結果を活用

全員が同じ状態から作業を継続できるように、事前に用意されている次の Studio ワークスペースへ進んでください。

  1. Studio のトップ・メニューから、「ファイル (File)」 > 「ワークスペースの切り替え (Switch Workspace)」 >「/home/streamsadmin/Workspaces/workspace4」の順に選択します。

Studio が終了し、新しいワークスペースで再起動します。ラボ 1 から 3 までで構築したのと同じアプリケーションを伴うプロジェクトが既に使用可能な状態となっています。唯一、プロジェクトに Project4、メイン・コンポジットに MainComposite4 の名前が付いているという点で異なります。「プロジェクト・エクスプローラー (Project Explorer)」で、「Project4」 > 「my.name.space」を展開し、「MainComposite4」をダブルクリックしてグラフィカル・エディターを開きます。

  1. Instance Graph」を表示します。
    1. Streams エクスプローラー (Streams Explorer)」の「InfoSphere Streams 3.2.0.0」を展開します。
    2. インスタンス (Instances)」を展開します。
    3. streams@streamsadmin」を右クリックし、「Show Instance Graph」を選択します。

これまでのラボのジョブで、現在も実行中のものが表示されます。適宜、キャンセルまたは非表示にしてください。

4.2 不測のデータに対するテストの追加

基本的に、フィードから受け取るデータについて検証を実施するべきです。データ形式の定義に問題があったり、不適格なデータは発生するものであり、送信ノイズが入り込む可能性もあります。データが予想に反し、アプリケーションが正常に機能しない状況は誰もが避けたいはずです。

例として、1 つの属性を確認するオペレーターを追加します。その属性は、車両 ID (id) とします。データ・ファイル all.cars では、すべてのレコードに、「Cnnn」 (自動車を意味する「car」の頭文字 “C” 付き) の id 値が含まれています。今回のラボとは違う状況の話になりますが、例えば、ある架空の状況において利用しているアプリケーションがこのラボのような形式を使用する場合、頭文字で示される車両タイプ別 (例えば、バスを意味する「Bus」の頭文字 “B”) に異なるアクションを取れるようにできるというわけです。また、すべての車両 ID がぴったり 4 文字長でなければならないというシステム要件を設定できます。ただし、「異常な」データについては、タプルを何もせずに除去してしまうのではなく、何が起こったかを監査し、後にできればアプリケーションを強化できるよう「異常な」データを保存するのが賢明です。

以下に、仕様をまとめます。

表 1. 仕様: 車両 ID (id 属性) 仕様
基準
頭文字“C”
文字長4

つまり、id として予期されていなかった値を伴うデータが入ってきた場合、プログラムは、無効なデータとして脇へ押しのけることになります。このような状況に対応できるオペレーターがいくつかあり、どのオペレーターを使うかは、ある程度、好みの問題となります。これまでのラボで、既に、この目的にかなったオペレーターを使ってみましたね。そうです、Filter です。ここでは別のオペレーターを使ってみましょう。

Split オペレーター: このオペレーターは、任意の式の評価に応じ、異なる出力ポート (もしくはポートなし) に対してタプルを送信するように設計されています。この式には、必須ではありませんが、入ってくるタプルの属性値を使用することができます。また必要な数だけ出力ポートを保有できます。今回は、必要となる出力ポートは 2 つだけです。1 つは、正常なフロー (「有効」な値) に対応し、もう 1 つは、それ以外 (「無効」な値) に対応します。

Split の仕組みは以下の通りです (下の図 1 の例を参照してください)。

  • N 出力ポートが 0, 1, …, N-1 と番号付けられます。
  • index パラメーターには、64 ビットの整数 (uint64) を返す任意の式が含まれています。
  • この式は、入ってくる各タプルに対して評価されます。
  • 式の値 n は、どの出力ポート p に対してタプルが送信されているかを判断します。
    • n ≥ 0 の場合、p = n mod N
    • n < 0 の場合、タプルは除去されます。
図 1. 出力ポートが 3 つある Split オペレーター
  1. Split オペレーターをグラフに追加します。
    1. MainCompositeNをダブルクリックで開きます。(プロジェクトに表示されていない場合は、一度プロジェクトのビルドを実行してください)
    2. グラフィカル・エディターで、パレット・フィルター・ボックスに split と入力します。Split オペレーターをメイン・コンポジットにドラッグします。
    3. スロットルされたストリーム (Throttle_n の出力) を選択し、ストリームのエンドポイントを Split_n の入力へ動かします。
    4. Split_n から Filter_n に新規ストリームを追加します。
  2. グラフに FileSink を追加します。FileSink の入力を Split オペレーター の 2 つめの出力ポートに接続します。
    1. 前と同じように FileSink をメイン・コンポジットへドロップします。これが、FileSink_n” となります。
    2. 2 つ目の Split ポートとの接続を確立するには、まず、第 2 のポートを追加する必要があります。Split には最低限 1 つの出力ポートがなければなりませんが、追加のものはオプションで、明示的に追加する必要があります。
      1. パレットのフィルター・ボックスを空にします。「設計 (Design)」の下で、「出力ポート (Output Port)」をドラッグし、Split_n 上へドロップします。Split_n にカーソルを合わせると、それが緑色に変わり、ポートをドロップすると、出力側に新しい黄色のボックスが現れるのが分かるかと思います。
      2. 次に、Split_n の 2 つ目の出力ポートから FileSink_n” へ接続を確立します。
  3. Split_n の 2 つの出力に対し、それぞれスキーマを割り当てます。割り当て方法は、既存のストリーム (Aggregate_n の出力を除く) からコピー・アンド・ペーストする方法でもいいですし、LocationType をパレットからドラッグ・アンド・ドロップする方法でもかまいません。
図 2. グラフィカル・エディター
  1. Split オペレーターを構成します。
    1. Split_n を選択し、「プロパティー (Properties)」ビューを開きます (「編集 (Edit)」を右クリック)。
    2. パラメーター (Param)」タブで、「(追加…) Add…」をクリックします。
    3. パラメーターの選択 (Select parameter)」ダイアログで「index」にチェックを入れ、「OK」をクリックします。
    4. index パラメーターの値フィールドで、以下の記載内容をそのまま正確に入力します (注: 0 と 1 の後の「l」は、小文字のエルで、64 ビット整数の「(文字) 長」を意味しています)。
      substring(id,0,1) == "C" && length(id) == 4 ? 0l : 1l
      これによって意味するのは、「オフセットがゼロから始まる長さ 1 の id 属性のサブストリング (つまり、id の頭文字) が「C」であり、id 属性の長さが 4 である場合、ゼロになる。それ以外の場合は 1 になる」ということです。正しい ID は最初のポート (Filter_n) へ行き、それ以外は 2 つ目のポート (FileSink_n”) へ行くということになります。
図 3. プロパティ・ビュー
  1. FileSink_n” を構成します。FileSink をこの前の 2 つのラボで使用してきました。忘れてしまった場合は、そちらで構成方法をおさらいしてみてください。以下のパラメーター値を設定します。
表 2. FileSink_n” のパラメーター値。「flush」というパラメーターが加わっていることに注意してください。
パラメーター
file"error.observations"
flush2u
formatcsv
quoteStringsfalse
  1. アプリケーションを起動し、元の出力 (「プロジェクト・エクスプローラー (Project Explorer)」の「リソース (Resources)」下にある data ディレクトリーの filtered.cars) が以前のように書き込まれているかを確認し、さらに十分な時間を置いた後で、新規出力ファイル (error.observations) に少なくとも 2 つのレコードがあるかを確認します。入力データ・ファイルには、誤った形式の id (およびその他の異常値) を伴う 2 つのレコードが含まれています。

4.3 取り込み (Ingest) モジュールの分割

ここからいよいよ面白くなってきます。Streams アプリケーション内において、データは、高速かつ柔軟なトランスポート・リンクであるストリームをオペレーターからオペレーターへと流れます。Streams アプリケーションの開発者にとって、これらをどのように実装するかが問題となることはありません。異なるホスト、同じホストの異なる PE、もしくは同じ PE 内で実行するオペレーターの間で、それぞれ作用に違いがあるかもしれませんが、グラフのロジックはその影響を受けません。アプリケーションが外部とデータを交換しなければならない場合、ファイルの入出力、ODBC、TCP、UDP、または HTTP 接続、メッセージ・キューなどについて、source および sink オペレーターを明示的に使用する必要があります。

ただし、同じインスタンスで実行する Streams アプリケーションの場合、もうひとつ別のモードでのデータ交換が可能です。それは、エクスポートとインポートです。アプリケーションは、ストリームをエクスポートし、同じインスタンスで実行中の別のアプリケーションで使用可能とすることができます。柔軟な基準に基づき、1 つ以上のアプリケーションがそのようなストリームをインポートできます。これらの、いわゆるアプリケーション・ストリームは、一度接続されると、アプリケーション内の PE 間で実行する他のすべてのストリームと同様、高速かつ柔軟になります。ジョブがサブミットまたはキャンセルされた際にのみ、どのリンクを確立または切断する必要があるかを見極めるため、ランタイム・サービスが関わります。これが終わると、ランタイムの動作が変化することはなく、パフォーマンスにも悪影響を及ぼしません。

柔軟性には、非常に大きな利点があります。アプリケーション・ストリームを、パブリッシュ - サブスクライブ方式の基準に基づいて接続することができ、これにより、開発者は完全なモジュラー・ソリューションを設計できるようになります。1 つのモジュールを、進化、置き換え、削除、あるいは複製する場合でも、その他のモジュールが影響を受けることはありません。個々のモジュールを小さな特殊モジュールとして維持します。

このラボ・シリーズでは、一体構造のアプリケーションを構築してきましたが、それでも論理的な区分があります。アプリケーションの DirectoryScan から Throttle までのフロントエンドは、読み取りデータ (この場合はファイルから) に関係しており、制御された形でデータを「再生」し、リアルタイム・フィードのように見せているのです。アプリケーションにおけるそれ以外の部分 (Split から FileSink まで) は、分析の実施と、結果の書き込みを行います。フロントエンドを別の「Ingest」モジュールに分割すれば、同じ構造と類似したコンテンツを持つものの、全く別のソースから来るタプルを生成する、もう 1 つのモジュールを一緒に、あるいは置き換えとして容易に追加できることは想像に難くないでしょう。これこそ、ラボにおいてこれから取り扱う内容です。別のモジュールを追加して、ライブ・データ・フィードを読み取り、そのデータを、このアプリケーションの別の部分で処理できるようにしてみましょう。

  1. グラフィカル・エディターで、「コンポジット (Composite)」をドラッグし、領域上でドロップします。既存のメイン・コンポジットではなく、グラフィカル・オブジェクトの外にドロップしてください。エディターはこれに Comp_1 と名前を付けますので、「プロパティー (Properties)」ビューで名前を FileIngest に変更してください。

「プロジェクト・エクスプローラー (Project Explorer)」内に新規のコンポジットができたものの、それに関連するビルドがないのが分かるかと思います。ビルドを作成しましょう。

  1. プロジェクト・エクスプローラー (Project Explorer)」で FileIngest メイン・コンポジットを右クリックします。「新規 (New)」 > 「分散ビルド(Distributed Build)」を選択し、表示されたダイアログで、デフォルトのままにし、「OK」をクリックします。
  2. 3 つのフロントエンド・オペレーターを、古いメイン・コンポジットから新しいメイン・コンポジットへ移動します。
    1. 古いメイン・コンポジットで、DirectoryScan_n、FileSource_n、Throttle_n の 3 つのオペレーターを選択します。
      各オペレーターは、Ctrl キーを押し続けながらクリックすることで選択できます。選択できたら、クリップボードへと切り取ります (Cntrl+X キー、もしくは右クリックして「切り取り (Cut)」を選択)。
    2. FileIngest コンポジットを選択し、3 つのオペレーターを貼り付けます (Cntrl+V キー、もしくは右クリックして「貼り付け (Paste)」を選択)。

これで、同じコード・モジュール (SPL ファイル) に、2 つのアプリケーション (メイン・コンポジット) ができました。これは標準的な方法ではありませんが、問題なく機能します。ただし、これらのアプリケーションは完成していません。Throttle_nSplit_n のリンクを切断したためです。

図 4. グラフィカル・エディター
  1. ストリームのエクスポート用に、新しいアプリケーション (FileIngest) を設定します。
    1. パレットで、Export オペレーターを見つけ (フィルター・ボックス内で ex と入力します)、FileIngest コンポジットへドロップします。
    2. ストリームを Throttle_n から Export_n へドラッグします。スキーマは、Throttle_n の出力ポートに属しているため、ストリームがなくても記憶されています。
    3. Export_n を選択し、「プロパティー (Properties)」ビューの「パラメーター (Param)」タブを開きます。
    4. 「追加… (Add…)」をクリックします。properties パラメーターを選択し、「OK」をクリックします。
    5. properties の値フィールドで、以下の「タプルのリテラル」を入力します。
      { category = "vehicle positions", feed = "sample file" }
      図 5. プロパティ・ビュー

      これにより、ストリームが、全く任意の名前と値のペアであるプロパティーのセットを使用して「パブリッシュ」されます (Web ページのタグと同様)。つまり、インポートする側のアプリケーションが、ある一定の「サブスクリプション」 (合致させる必要のあるプロパティー・セット) を満たすストリームを探すことができる、ということです。

    6. 保存します。FileIngest アプリケーションが構築されますが、古い方にはまだエラーが出ます。
  2. ストリームのインポート用に、古い方のアプリケーションを設定します。
    1. パレットで、Import オペレーターを見つけ、これを古い方のメイン・コンポジットにドロップします。
    2. Import_n から Split_n へストリームをドラッグします。
    3. このストリームへスキーマを割り当てます。割り当て方法は、既存のストリーム (Aggregate_n の出力を除く) からコピー・アンド・ペーストする方法でもいいですし、LocationType をパレットからドラッグ・アンド・ドロップする方法でもかまいません。
    4. Import_n を選択し、「プロパティー (Properties)」ビューの「パラメーター (Param)」タブを開きます。必須パラメーターである subscription が既に用意されています。
    5. subscription (プレースホルダー parameterValue) の値フィールドをクリックし、次のブール式を入力します。 category == "vehicle positions"
図 6. プロパティ・ビュー

ここでは 1 つのプロパティーしか探していないことに注意してください。カギとなるカテゴリーと値は "vehicle positions" です。たまたま使用可能になっているもうひとつの方は無視していただいて全く問題ありません。サブスクリプションの述部が満たされると、(ストリーム・タイプが合致していれば) 接続が確立します。

  1. 連携する 2 つのアプリケーションに分けられた、この新たな配置をテストしてみましょう。
    1. Instance Graph」で、残っているジョブをすべてキャンセルします。カラー・スキームを「100 以下のフロー [n タプル/秒] (Flow Under 100 [nTuples/s])」に設定します。表示を拡大し、2 つのジョブを見やすくします。
    2. 「プロジェクト・エクスプローラー (Project Explorer)」で、古いアプリケーション (例: MainComposite4) を起動します。
    3. 新規アプリケーション (FileIngest) を起動します。

2 つのメイン・コンポジットに分かれているにも関わらず、インスタンス・グラフを通して、オペレーターからオペレーターのタプルが流れているのが分かるかと思います。2 つのアプリケーションは実行させたままにしておき、今度は 3 つ目を追加してみましょう。

図 7. グラフィカル・エディター

4.4 ライブ・フィードの追加

ライブデータを取り込むアプリケーションを一から構築する代わりに、既に用意されている Streams プロジェクトをインポートします。このアプリケーションは、インターネット・ツールキットからの InetSource オペレーターを使用します。これにより、NextBus.com からの Web サービス・フィードに接続し、定期的 (30 秒ごと) に、サンフランシスコ市営のバスと路面電車の現在のロケーション、速度、進行方向をダウンロードします。データの解析、フィルター、および変換を行い、結果がファイル・データと同じように見える (いくつかの相違点は残ります) ようにします。処理アプリケーションのサブスクリプションと一致するプロパティー・セットで結果のストリームをエクスポートします。NextBus アプリケーションを起動すると、自動的に接続が確立し、ジョブをキャンセルするまでデータが継続的に流れます。

  1. まず、Studio ワークスペースで、インターネット・ツールキットが使用可能な状態であることを確認します。確認方法はいくつかありますが、「ビッグデータ用タスク・ランチャー (Task Launcher for Big Data)」を使用して確認するのが簡単です。ラボ環境では、Streams でインストールされたツールキットが既に使用可能となってはいますが、念のためにこの手順を踏みます (何度使用可能に設定しても害はありません)。
    1. タスク・ランチャー (Task Launcher)」がまだ開いていれば、それをクリックしてください。開いていなければ、Eclipse のトップ・メニューで「ヘルプ (Help)」 > 「ビッグデータ用タスク・ランチャー (Task Launcher for Big Data)」を選択します。
    2. 概要 (Overview)」タブの「ファースト・ステップ (First Steps)」下にある「SPL ツールキットを使用可能にする (Make SPL Toolkits available)」をクリックします。
    3. 以上で確認完了です。「タスク・ランチャー (Task Launcher)」は閉じていただいて結構です。
  2. NextBus プロジェクトをインポートします。
    1. Eclipse トップ・メニューで、「ファイル (File)」 > 「インポート… (Import…)」を選択します。
    2. インポート・ダイアログで、「InfoSphere Streams Studio」 > 「SPL プロジェクト (SPL Project)」を選択し、「次へ (Next)」をクリックします。
    3. 参照… (Browse…)」をクリックします。「My Home」をクリックして展開し、「StreamsExamples」を選択して、「OK」をクリックします。
      図 8. ソースのインポート1
    4. NextBus のみを選択し、「完了 (Finish)」をクリックします。
      図 9. グラフィカル・エディター
  3. アプリケーションを起動します (プロジェクトのビルドが完了するまで待つ必要があるかもしれません)。
  4. 「Instance Graph」を最大化し、調整します。

ここでは、3 つのアプリケーションがすべて接続されているはずです。Data ディレクトリーに追加のファイル情報をコピーするたびに FileIngest ジョブからタプルが流れます。30 秒のバーストで NextBus ジョブからタプルが流れます。error.observations ファイルに、NextBus からのレコードが徐々に溜まっていきます。これらの車両 ID は、「Cnnn」形式に合致していません (ファイルを定期的にリフレッシュしてください。ファイルを表示しているエディターをクリックし、「ファイル変更 (File Changed)」ダイアログで「はい (Yes)」をクリックします。このダイアログは、基礎コンテンツが変更されたことを Studio が検知すると表示されます)。

図 10. グラフィカル・エディター
図 11. error.observations ファイル

4.5 まとめ

このラボを通してご紹介したのは、Streams で実現しうることのほんの一部に過ぎません。ここで構築したアプリケーションは、ラボを進めるにつれ徐々に興味深くなっていきましたが、その構築には、ほんのわずかな SPL 式を除いて、コード化を用いることはありませんでした。用意された、非常にシンプルなモジュールの助けを借りつつ、最終的にこのアプリケーション・セットは、車両ロケーション観測の継続的な流れに対処でき、異なるコンテンツを区別できるようになるに至りました。単に、少数のビルディング・ブロックを使用し、タプル・フローの仕組みを管理するグラフをスケッチして、プロパティー・ビューにおける少数のパラメーターを設定するだけでも、十分に強力なソリューションの基礎が形成されたのです。

次のステップは、皆さんが以下の点について判断を下すことです。データ・ストリーム用のこのアプリケーション開発プラットフォームが、ご自身の業務を強化するソリューションの構築に役立つか。このラボで経験したことが、解決したいと思っていた問題の解決に役立つか。ご自身の組織で、必要な開発スキルが利用できるか、あるいはスキル構築に投資すべきか。IBM では、この製品に関する正式な研修を提供しております。また、お客様技術担当員による、ストリーム・データに関連するソリューションの機会の特定、ワークショップの開催、ならびにパイロット・プロジェクトやそれ以上のレベルへのステップ・アップなど、さまざまな支援も実施しております。

それでは、ストリームをお楽しみください。


ダウンロード可能なリソース


コメント

コメントを登録するにはサインインあるいは登録してください。

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=Information Management, Cloud computing, Linux
ArticleID=969279
ArticleTitle=IBM InfoSphere Streams 3.2ハンズオン・ラボ: 入門編【ラボ4】
publish-date=05072014