目次


プロ・ゴルフとプロ・テニスのためのプレディクティブ・クラウド・コンピューティング

第 3 回 処理中のビッグデータ

Comments

コンテンツシリーズ

このコンテンツは全#シリーズのパート#です: プロ・ゴルフとプロ・テニスのためのプレディクティブ・クラウド・コンピューティング

このシリーズの続きに乞うご期待。

このコンテンツはシリーズの一部分です:プロ・ゴルフとプロ・テニスのためのプレディクティブ・クラウド・コンピューティング

このシリーズの続きに乞うご期待。

プロ・ゴルフ・トーナメントとプロ・テニス・トーナメントの開催期間中は、クラウド・コンピューティング環境を介したさまざまなデータ・ストリームによって、ファンに没入型のデジタル・エクスペリエンスが提供されます。プレディクティブ・クラウド・コンピューティング・システムは、この大規模なデータ、つまり「処理中のビッグデータ」にアナリティクスを適用し、将来の試合の状態に関する予測子を抽出するとともに、ソーシャル・スコアボードにプレイヤーの人気度を表示します。このストリーム・コンピューティングのテクノロジーには、以下の要素が関連します。

  • 自然言語処理
  • IBM InfoSphere Streams
  • UIMA (非構造化情報管理アーキテクチャー) PEAR (処理エンジン・アーカイブ)
  • Twitter
  • Bluemix
  • IBM Content Analytics

ストリーム・コンピューティングは、それぞれのスポーツ・イベント開催中に、プレディクティブ・クラウドのリソース管理とライブ・ソーシャル・スコアボードに関して集計したリアルタイムの統計情報を生成します。

ストリーム・コンピューティングのアーキテクチャーとデータ

現在出場しているプレイヤーの人気度とプレイの傾向を提供するため、さらには現在計画している対象期間より前にクラウド・リソースのプロビジョニングを行えるようにするために、PCC 内でスポーツに関するデータとソーシャル・データの分析処理がリアルタイムで行われるようにしなければなりません。待ち時間を伴うことのない膨大なデータ・アナリティクスを実現するために、情報はストリームの中を通って特定のオペレーター (つまり、処理モジュール) にプッシュされ、アトミックな機能が実行されると再びストリームに戻されて、さらなる処理や出力が行われます。IBM InfoSphere Streams は、SPL (Streams Processing Language) という専用のストリーム処理言語を提供しているほか、カスタムの処理を行うためにネイティブの C/C++ および Java のバインディングもサポートしています。リアルタイムのデータを分析するための SPL および Java によるオペレーターが数多く用意されたプレディクティブ・クラウド・コンピューティングでは、ストリームを広範に使用します。

InfoSphere Streams が実行されるときには、有向グラフという形がとられます。最初は、1 つ以上の「ソース」オペレーターがデータを生成するか、外部データ・ソース (ローカル・ファイル、リモート・データベース、ソケット、HTTP プロトコル、カスタム・ソースなど) からデータを受信するかのいずれかを行います。データが取り込まれると、送られてくるデータを操作するファンクターとして機能するオペレーターに、そのデータは送られます。オペレーターは、データを拒否できるほか、特定の時間枠でデータを集計することや、さらなるデータを混ぜること、あるいはデータを別のフォーマットに変換することなどができます。

プレディクティブ・クラウド・プロジェクトでのこのワークフローの一例は、図 1 に示す Twitter データの処理です。この場合、ソース・オペレーターは Twitter GNIP API に接続して、ライブ・ツイートから JSON テキストを受け取ります。このテキストは、次のオペレーターによって解析され、各ツイートに関連付けられている大量のメタデータから関連するデータ・フィールドだけが JSON に抽出されます。そこから先は、データは 2 つのパスに従います。一方のパスでは、ツイートは (到着時刻で判断した) 一定の時間枠で集計され、ツイートの総数とその時間枠のタイム・スタンプだけが保管されます。60 秒間集計された後、その数とタイム・スタンプが UDP メッセージ・シンクを介して Graphite に POST 送信されます。もう一方のパスでは、ツイートのデータが HTTP POST シンクに送信され、そこから HTTP POST リクエストが RESTful API に送信されて、ツイート情報がデータベースに保管されます。

InfoSphere Streams は、データを操作するための多数の組み込みオペレーター、シンク、ソースを提供しているものの、あらゆる使用ケースを網羅することはできません。例えば、上述の Twitter ワークフローの場合、HTTP ポスト・リクエストを行うためのオペレーターは InfoSphere Streams にはありません。そのため、私たちはプロデューサー/コンシューマー・モデルに従って、送られてくるツイート・データと送られていくリクエストを処理する Java オペレーターを作成しました。時間のかかる HTTP リクエストが Streams の実行フローをブロックしないよう、送られてくる Twitter データはキューに入れるようにしました。データを RESTful サービスに送信するための HTTP リクエストは、別個の実行スレッドで実行します。InfoSphere Streams には Java API ツールキットが用意されており、このツールキットはカスタム・オペレーターが継承します。カスタム・オペレーターは InfoSphere Streams の第 1 級市民として、ネイティブ・オペレーターとまったく同じように動作します。カスタム・オペレーターは、自身に渡されるすべてのパラメーターを使用するように構成することができ、それぞれに固有の JVM 内でサンドボックス化されます。

図 1 には、データ・ソースに接続する 5 種類のストリーム・コンピューティング・ジョブが示されています。これらのジョブは、最終的に RESTful Web サービス、ディスク、または別のジョブに出力を提供します。一般に、各処理要素が受け取るタプルには、結果のオブジェクトが含まれるタプルや、シグナルのタイプ (ウィンドウなど) を表すパンクチュエーションが含まれます。各ジョブには、SPL でプログラミングされた有効グラフによって関連付けられた 1 つ以上の処理要素を含めることができます。

図 1. PCC のストリーム・コンピューティング・アーキテクチャーの全体
ストリーム・コンピューティング・アーキテクチャーを示す図
ストリーム・コンピューティング・アーキテクチャーを示す図

ソーシャル・スコアボードをサポートするのは、図 1 のジョブ 0、1、2、および 3 です。ジョブ 0 は、ツイートをプルする対象の HTTP サービスを記述する入力パラメーターを受け取ります。スポーツ・イベントについては、指定されたユーザーとパスワードを使用して Twitter の Powertrack に接続するセキュア HTTP が、オプションのランタイム・パラメーターで定義されたストリーム・ソースのヘッダーに組み込まれます。パラメーターが指定されていない場合は、デフォルトの値が指定されます (リスト 1 を参照)。オペレーターの出力は、rstring という形のツイートからなる TwitterStream という名前のストリームです。他のコンシューマー・ジョブ (ジョブ 1、2、3 など) によって識別可能なように、このストリームは、GNIPStream ID を設定してエクスポートされます。

リスト 1. SPL によるジョブ 0 の Twitter 接続
namespace application ;

use httputils.com.ibm.ssb.inet.http::* ;

//Reads the GNIP Twitter Stream and exports the raw text to other
//Stream instances on the same server instance
composite TwitterStreamExporter
{
    param
        expression<rstring> $protocol : getSubmissionTimeValue("protocol", "https") ;
        expression<rstring> $host : getSubmissionTimeValue("host", "<host>") ;
        expression<rstring> $baseurl : getSubmissionTimeValue("baseurl",”<URL ending>") ;
        expression<rstring> $userid : getSubmissionTimeValue("userid", "<userid>") ;
        expression<rstring> $password : getSubmissionTimeValue("password", "<password>") ;
    type

        TwStreamT = rstring gnip, rstring body, rstring actor, rstring id,
            rstring object, rstring postedTime ;
    graph

        stream<rstring tweet> TwitterStream as TwSHTTPReaderOp = HTTPGetStreamSource()
        {
            param
                protocol : $protocol;
                host : $host;
                baseurl : $baseurl;
                userid : $userid;
                password : $password;
        }

        () as ExportOperator = Export(TwitterStream)
        {
            param
                streamId : "GNIPStream";
        }
}

ジョブ 1 (リスト 2 を参照) は、指定されたインポート・オペレーターで GNIPStream をインポートし、ストリームに TwitterStream という名前を付けます。続いて、Java コードで作成されたカスタム・オペレーターが JSON ストリングをタプルに変換します。この JSONToTuple オペレーターを定義しているのが、リスト 3 に示すオペレーター・モデルの XML ファイルです。manageLibrary セクションに、必須ライブラリーの場所が定義されています。オペレーターに入力されるすべてのパラメーターに加え、メトリクスも記述されています。メトリクスとしては、処理エンジンのアクティビティーをモニターするための静的状態 (カウンターやゲージなど) が提供されています。JSONToTuple オペレーター・モデルに含まれているのは、パラメーターだけです。最後に、入力ポートと出力ポートが指定されています。特に重要なのは、パンクチュエーションに関する動作です。これらの指示によって、新しいパンクチュエーションを生成するか、無視するか、あるいは次の処理エンジンに渡すかどうかが決まるためです。

TwitterOperator は TwitterTupleStream を受け取って、プレイヤーごとの 1 分あたりのツイート総数を集計します。集計された結果は、DB2 に保管するために RESTful サービスに送信されます。Graphite のパラメーターは、時系列データベースへの UDP 接続を確立するために使用されます。アグリゲーター・オペレーターにより、1 分間の時間枠でツイートの総数が集計されます。ファンクターが指定する、SPL で作成されたカスタム・オペレーターが、Graphite のターゲット・スペースを設定します。最終的に生成される GraphiteLogProcessingRate は、UDPSink にダンプされます。

リスト 2. SPL によるジョブ 1 の Twitter 集計
namespace application ;

use JSONHelpers.com.ibm.ssb.parsers.json::JSONToTuple ;
use TwitterOperator.com.ibm.ei.twitter::TwitterOperator ;

composite TwitterStreamReader
{
    param
        expression<rstring> $playerSummaryURL : getSubmissionTimeValue("playerSummaryURL");
        expression<rstring> $tweetSummaryURL : getSubmissionTimeValue("tweetSummaryURL") ;
        expression<rstring> $siteTag : getSubmissionTimeValue("siteTag") ;
        expression<rstring> $siteTagPlayerDelimiter : getSubmissionTimeValue("siteTagPlayerDelimiter") ;
        expression<rstring> $graphiteHost : getSubmissionTimeValue("graphiteHost", "<host>");
        expression<int32>   $graphitePort : (int32)getSubmissionTimeValue("graphitePort", "<port>");
        expression<rstring> $graphiteMetricName : getSubmissionTimeValue("graphiteMetricName");
        expression<rstring> $siteName : getSubmissionTimeValue("siteName") ;
        expression<rstring> $siteYear : getSubmissionTimeValue("siteYear") ;
        expression<rstring> $plexId : getSubmissionTimeValue("plexId") ;
    type

        TwStreamT = rstring gnip, rstring body, rstring actor, rstring id,
            rstring object, rstring postedTime, rstring retweetCount ; // int64 id; 

    graph

        stream<rstring tweet> TwitterStream = Import()
        {
            param
                applicationName : "application::TwitterStreamExporter";
                streamId : "GNIPStream";
        }

        //convert json string to tuple
        stream<TwStreamT> TwitterTupleStream = JSONToTuple(TwitterStream)
        {
            param
                continueOnError : true ;
        }

        //Process json tuples
        () as TwitterSink = TwitterOperator(TwitterTupleStream)
        {
            param
                tweetSummaryURL : $tweetSummaryURL;
                playerSummaryURL : $playerSummaryURL;
                siteTag : $siteTag;
                siteTagPlayerDelimiter : $siteTagPlayerDelimiter;
                siteName : $siteName;
                siteYear : $siteYear;
                plexId : $plexId;
                graphiteHost : $graphiteHost;
                graphitePort : $graphitePort;
        }
        
        stream<int32 tweetsProcessed> TweetProcessingRate = Aggregate(TwitterStream)
        {
            window
                TwitterStream : tumbling, time(1.0);
            output
                TweetProcessingRate : tweetsProcessed = Count();
            config 
                restartable : true;
        }
        
        stream<rstring data> GraphiteLogProcessingRate = Functor(TweetProcessingRate)
        {
            output
                GraphiteLogProcessingRate: data = $graphiteMetricName + ":" + ((rstring)tweetsProcessed) + "|c";
        }
        
        () as UDPSink1 = UDPSink(GraphiteLogProcessingRate)
        {
            param
                address : $graphiteHost;
                port : (uint32)$graphitePort;
        }
}

JSONToTuple オペレーターは、リスト 3 に記載するオペレーター・モデルの XML ファイルに定義されています。

リスト 3. JSONToTuple オペレーター・モデル
<operatorModel xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.ibm.com/xmlns/prod/streams/spl/operator" xmlns:cmn="http://www.ibm.com/xmlns/prod/streams/spl/common" xsi:schemaLocation="http://www.ibm.com/xmlns/prod/streams/spl/operator operatorModel.xsd">
  <javaOperatorModel>
    <context>
      <description></description>
      <executionSettings>
        <className>com.ibm.streams.JSONToTuple</className>
        <vmArgs/>
      </executionSettings>
      <libraryDependencies>
        <library>
          <cmn:description>Java operator class library</cmn:description>
          <cmn:managedLibrary>
            <cmn:libPath>../../lib/JSON4J.jar</cmn:libPath>
            <cmn:libPath>../../impl/java/bin</cmn:libPath>
          </cmn:managedLibrary>
        </library>
      </libraryDependencies>
    </context>
    <parameters>
      <parameter>
        <name>dataParamName</name>
        <description docHref="" sampleUri="">Name of the parameter carrying the JSON string. </description>
        <optional>true</optional>
        <type>rstring</type>
        <cardinality>1</cardinality>
      </parameter>
      <parameter>
        <name>continueOnError</name>
        <description>Default is false. If true, an empty tuple is output if a parsing error occurs.</description>
        <optional>true</optional>
        <type>boolean</type>
        <cardinality>1</cardinality>
      </parameter>
      <parameter>
        <name>addJSONString</name>
        <description>Adds the source JSON string to the specified attribute. Note that if that attribute was populated by parsing the json, then it will be overwritten.</description>
        <optional>true</optional>
        <type>rstring</type>
        <cardinality>1</cardinality>
      </parameter>
      <parameter>
        <name>copy</name>
        <description>Names of fields that need to be copied over to the output port. Note that these fields will NOT be populated by the JSON parsing.</description>
        <optional>true</optional>
        <type>rstring</type>
        <cardinality>-1</cardinality>
      </parameter>
      <parameter>
        <name>target</name>
        <description docHref="" sampleUri="">The operator will attempt to parse the json based on an attribute in the main tuple on output port 0. This enables auto-copying of all other common fields from input to output.</description>
        <optional>true</optional>
        <type>rstring</type>
        <cardinality>1</cardinality>
      </parameter>
      <parameter>
        <name>protectedPrefix</name>
        <description>Prefix added to attributes with the same name as protected SPL keywords. Default &quot;__pr_&quot;</description>
        <optional>true</optional>
        <type>rstring</type>
        <cardinality>1</cardinality>
      </parameter>
    </parameters>
    <inputPorts>
      <inputPortOpenSet>
        <windowingMode>NonWindowed</windowingMode>
        <windowPunctuationInputMode>Oblivious</windowPunctuationInputMode>
      </inputPortOpenSet>
    </inputPorts>
    <outputPorts>
      <outputPortOpenSet>
        <windowPunctuationOutputMode>Generating</windowPunctuationOutputMode>
        <windowPunctuationInputPort>-1</windowPunctuationInputPort>
      </outputPortOpenSet>
    </outputPorts>
  </javaOperatorModel>
</operatorModel>

ジョブ 2 はオプションであり、すべてのツイートをディスクに書き込むために使用されます。FileSink オペレーターは、標準的なストリーム・コンピューティング API オペレーターであり、ディスク上でツイートを書き込む場所を決定します。Big Data Toolkit がインストールされてリンクされている場合は、SPL コンパイル・ステージで HDFSFileSink を呼び出して Hadoop Distributed File シンクにツイートを取り込むこともできます。HDFSFileSink の名前空間は com.ibm.streams.bigdata.hdfs です。オペレーターを実行するマシンには、HDFS コマンド・ライン・ツールもインストールしなければなりません。ジョブ 3 は、TwitterGNIPStream をインポートし、各プレイヤーに対するソーシャル・センチメントを測定し、集計した結果を JSON ファイルに送信します。この JSON ファイルは、ディスクまたはオブジェクト・ストレージに保管されます。

ソーシャル・センチメント・スコアボードと SPL

図 2 に示されているジョブ 3 は、ツイートに関連付けられているスポーツ・トーナメントの出場プレイヤーごとに、ソーシャル・センチメントのスコアを付けます。このジョブには、カスタム Java オペレーター、カスタム SPL オペレーター、そして Streams API が提供するオペレーターの組み合わせを使用する 20 の処理要素が含まれています。ジョブ 0 によってエクスポートされる TwitterStream が、ジョブ 1 によってインポートされ、JSON からタプルに変換されます。このタプルのテキストに自然言語の処理フィルターが適用された後、ストリーミング・システム保護が呼び出されます。無駄なサイクルが生じるのを防いで、リアルタイム処理にとってのリスクを軽減するために、expired パラメーターによって古いツイートがダウンストリームに流れ続けるのを防ぎます。ツイートがすべてのフィルターと有効期限切れのチェックを通過した場合は、並列処理エンジンがソーシャル・センチメント分析用の UIMA PEAR ファイルをインストールして起動します。分析結果は解析されて、タプルが生成され、特定のトーナメントのプレイヤー ID と結合されます。特定の時間枠 (例えば、5 分間) にわたり、プレイヤーごとにプラスとマイナスのパーセンテージが累積されていきます。タプルは最終的に JSON に変換されて、オブジェクト・ストレージに保管されます。

図 2. ソーシャル・センチメント・ストリーム・ジョブの処理要素のフロー
処理要素のフローを示す図
処理要素のフローを示す図

TwitterStream は、JSONToTuple という名前のカスタム Java オペレーターによって TwitterTupleStream に変換されます。JSONToTuple オペレーターは、use JSONHelpers.com.ibm.ssb.parsers.json::JSONToTuple ステートメントによって、複合 SPL コードに組み込まれ、このステートメントが参照するオペレーター・モデルによって、オペレーター JAR ファイルが組み込まれます。TwitterTupleStream からは feed という名前のストリームが作成されて、別の複合オペレーターに入力されます。リスト 4 に、別の複合 SPL オペレーターに入力されるストリーム・パラメーター feed を示します。

リスト 4. FeedProcessing 複合 SPL フローへの feed ストリームの入力
//GNNIPRacketStream.spl
stream<ustring text, timestamp created_at> feed = GNIPStreamParser(TwitterTupleStream)        
        {
        }

        // --- Processing the feed        
        () as proc = FeedProcessing(feed){        
            param        
                inputTextField: "text";        
        }        

//FeedProcessing.spl
namespace com.ibm.incubator.rs ;

use JSONHelpers.com.ibm.ssb.parsers.json::TupleToJSON;
use ObjectStorage.com.ibm.store::ObjectStoreRetrieveResource;

composite FeedProcessing(input feed )
{
….

入力ストリーム feed は TransitiveWindower と名付けられたカスタム Java オペレーターに取り込まれ、このカスタム・オペレーターによって出力ストリーム transitiveWindower にパンクチュエーションが追加されます。パンクチュエーションは、ランタイム・パラメーターまたはデフォルト・パラメーターによって 5 分に定義されるため、この時間枠の境界が出力ストリーム上に設定されます。各パンクチュエーションは後でセンチメントを集計するために使用されます。ツイートをフィルタリングするカスタム Java オペレーターは、NonLatinLettersRateCalculator という名前の、リスト 5 に示すオペレーターです。このオペレーターによって生成されるストリームが SPL カスタム・オペレーターに入力されて、現在のシステム時刻で修飾された、パス・スルー・パンクチュエーションとタプルからなるストリームが生成されます。

リスト 5. NonLatinLettersRateCalculator カスタム Java オペレーターのコード
package com.ibm.incubator.rs.operators;

import com.ibm.streams.operator.OutputTuple;
import com.ibm.streams.operator.StreamingInput;
import com.ibm.streams.operator.StreamingOutput;
import com.ibm.streams.operator.Tuple;

public class NonLatinLettersRateCalculator extends BaseOperator {

    public static final String TA_RATE = "rate";

    @Override
    public void process(StreamingInput<Tuple> stream, Tuple tuple) throws Exception {
        StreamingOutput<OutputTuple> out = getOutput(0);
        OutputTuple t = out.newTuple();

        String message = tuple.getString(com.ibm.incubator.rs.operators.GNIPStreamImporter.TA_TEXT);
        
        int letters = 0;
        int nonLatinLetters = 0;
        for(int i = 0; i < message.length(); i++) {
            char c = message.charAt(i);
            if(Character.isLetter(c)) {
                letters++;
                if(c > 128) {
                    nonLatinLetters++;
                }
            }
        }
        float rate = (float) nonLatinLetters / letters;
        
        t.assign(tuple);
        t.setFloat(TA_RATE, rate );
        out.submit(t);
    }
}

ラテン文字でフィルタリングされたストリームは、ソーシャル・センチメント分析の前にツイートがネットワークに配信された時刻を調べるオペレーターに入力されます。ツイートがパラメーター化されたしきい値を超えている場合は、現在のストリームから除外されます。feed_non_delayed という名前の出力ストリームは 3 つのストリームに分割されてカスタム USOpenSentimentAnalyser に入力されます (リスト 6 を参照)。

リスト 6. UIMA ソーシャル・センチメント分析用に 3 つのストリームに分割される、遅延のない Twitter ストリーム
// --- Splitting feed stream
        (stream<I> feed1 ; stream<I> feed2 ; stream<I> feed3) = Split(feed_non_delayed
            as I)
        {
            logic
                state : mutable int64 i = 0 ;
                onTuple I :
                {
                    i ++ ;
                }

            param
                index : [ i % 3l ] ;
        }

        // --- Sentiment analysis
        stream<AnalysisResult result> analysed1 = USOpenSentimentAnalyser(feed1 as I)
        {
            param
                pearFile : $packagedPearPath; 
                localWriteLocation : $pearDropPath ;
                installDir : "/tmp/pear_install" ;
                inputTextField : $inputTextField ;
                parallelization : 1 ;
                expiredSeconds : $expiredSeconds ;
                storeObjectName : $objectStorePearFile ;
                storeConfigFile : $objectStoreConfigPath ;
                storeContainer : $container ;
                storeVersion : $storeVersion ;
                refreshRateSeconds : $refreshRateSeconds ;
        }

        stream<AnalysisResult result> analysed2 = USOpenSentimentAnalyser(feed2 as I)
        {
            param
                pearFile : $packagedPearPath; 
                localWriteLocation : $pearDropPath ;
                installDir : "/tmp/pear_install" ;
                inputTextField : $inputTextField ;
                parallelization : 1 ;
                expiredSeconds : $expiredSeconds ;
                storeObjectName : $objectStorePearFile ;
                storeConfigFile : $objectStoreConfigPath ;
                storeContainer : $container ;
                storeVersion : $storeVersion ;
                refreshRateSeconds : $refreshRateSeconds ;
        }

        stream<AnalysisResult result> analysed3 = USOpenSentimentAnalyser(feed3 as I)
        {
            param
                pearFile : $packagedPearPath; 
                localWriteLocation : $pearDropPath ;
                installDir : "/tmp/pear_install" ;
                inputTextField : $inputTextField ;
                parallelization : 1 ;
                expiredSeconds : $expiredSeconds ;
                storeObjectName : $objectStorePearFile ;
                storeConfigFile : $objectStoreConfigPath ;
                storeContainer : $container ;
                storeVersion : $storeVersion ;
                refreshRateSeconds : $refreshRateSeconds ;
        }

USOpenSentimentAnalyzer はリスト 7、リスト 8リスト 9 によって、Streams ビルドまたは Streams アプリケーション・バンドル (SAB) ファイルに含まれる、UIMA PEAR をインストールします。指定されたリフレッシュ・レートに基づいてリモート PEAR がオブジェクト・ストレージからプルされて、パッケージ PEAR MD5 ハッシュと比較されます。ハッシュが異なる場合、リモート PEAR はインストールされた PEAR をリフレッシュによって無効にします。このようにして、PEAR ファイルはオペレーターによってリモートからリアルタイムで更新することができます。変更中の PEAR ファイルはロックされ、各 Java プロセスが処理要素 ID を使用して独自のディレクトリー構造を作成するため、各プロセスは互いに独立することになります。

リスト 7. UIMA PEAR ファイルは、UIMA ソーシャル・センチメント分析をサポートするためにリフレッシュされてインストールされます
private void refreshAnalyzer(String pearFile, String installFolder, int parall, boolean toLock) throws Exception {
        // Initializing UIMA analyzer
        if (analyzer != null) {
            analyzer.shutdown();
        }
        @SuppressWarnings("resource")
        FileChannel fileChannel = new RandomAccessFile(new File(pearFile), "rw").getChannel();
        @SuppressWarnings("resource")
        FileLock lock = toLock ? getFileLockWithAttempts(fileChannel, MAXIMUM_FILE_LOCK_ATTEMPTS) : null;

        analyzer = new USOpenSentimentsAnalysis();
        try {
            try {
                analyzer.init(new File(pearFile), FileUtils.getDirectory(installFolder), parall);
            } catch (RuntimeException e) {
                error(e, "Cannot initialize UIMA analyzer.");
                throw new Exception(e);
            }
        } finally {
            if (toLock) {
                lock.release();
                fileChannel.close();
            }
        }
    }
リスト 8. UIMA ソーシャル・センチメント・アナライザーは、タプルを処理する前にリソースを初期化します
@Override
    public synchronized void initialize(OperatorContext context) throws Exception {
        super.initialize(context);

        processingEngineID = new Integer(context.getPE().getPEId().intValue()).toString();

        // Reading parameters
        String pearFilePath = getReqParam(P_PEARFILE);
        String installFolder = getReqParam(P_INSTALLDIR);
        String writeLocation = createUniqueResourceDrop(getReqParam(P_LOCAL_WRITE_LOCATION), processingEngineID);
        int parall = getParamAsInt(P_PARALLELIZATION, true, DEF_PARALLELIZATION);

        // Write the packaged file to local disk
        File writeLocationFile = new File(writeLocation);
        String writeLocationPath = writeLocationFile.getPath().substring(0,
                writeLocationFile.getPath().lastIndexOf(File.separator));
        if (!writeLocationFile.exists()) {
            if (new File(writeLocationPath).mkdirs()) {
                RESULT_LOG.trace("Created the following directories: " + writeLocationPath);
            }
            writeLocationFile.createNewFile();
        }
        try (@SuppressWarnings("resource")
        FileChannel fileChannel = new RandomAccessFile(writeLocationFile, "rw").getChannel();
                FileLock lock = getFileLockWithAttempts(fileChannel, MAXIMUM_FILE_LOCK_ATTEMPTS);) {
            try (final InputStream fis = Files.asByteSource(new File(pearFilePath)).openStream();) {
                ByteBuffer writeBuf = ByteBuffer.wrap(IOUtils.toByteArray(fis));
                fileChannel.truncate(0);
                while (writeBuf.hasRemaining()) {
                    fileChannel.write(writeBuf);
                }
            }
        }

        // Initializing UIMA analyzer
        refreshAnalyzer(writeLocation, installFolder, parall, true);

        inputTextField = getReqParam(P_INPUT_TEXT_FIELD);
        Integer seconds = getParamAsInt("expiredSeconds", true, 180);
        MDC.put("id", getOperatorContext().getName());
        getOperatorContext().getMetrics().getCustomMetric(M_EXPIRED_TIME).setValue(seconds);
        int refreshRateSeconds = getParamAsInt(P_REFRESH_RATE_SECONDS, true, DEFAULT_REFRESH_RATE_SECONDS);
        getOperatorContext().getMetrics().getCustomMetric(M_PEAR_REFRESH_TIME).setValue(refreshRateSeconds);
        delta = new Timestamp(seconds, 0);
    }
リスト 9. Java コードに含まれる UIMA ソーシャル・センチメント・アナライザーのロジック
/** Input text processing. */
    @Override
    public synchronized void process(StreamingInput<Tuple> stream, Tuple tuple) throws Exception {
        long time = System.currentTimeMillis();
        Timestamp createdAt = tuple.getTimestamp(GNIPStreamImporter.TA_CREATED_AT);
        String text = tuple.getString(inputTextField);
        int refreshRateSeconds = getParamAsInt(P_REFRESH_RATE_SECONDS, true, DEFAULT_REFRESH_RATE_SECONDS);
        String localWriteLocation = createUniqueResourceDrop(getReqParam(P_LOCAL_WRITE_LOCATION), processingEngineID);
        String installFolder = getReqParam(P_INSTALLDIR);
        int parall = getParamAsInt(P_PARALLELIZATION, true, DEF_PARALLELIZATION);
        if (createdAt.before(lastPunctuation.subtract(delta))) {
            incMetricValue(M_SKIPPED);

            long skippedCount = getOperatorContext().getMetrics().getCustomMetric(M_SKIPPED).getValue();
            long curMaxDifference = getOperatorContext().getMetrics().getCustomMetric(M_MAX_TIME_DIFFERENCE).getValue();
            long curDifference = lastPunctuation.subtract(createdAt).getSeconds();

            difference += curDifference;
            if (curDifference > curMaxDifference) {
                getOperatorContext().getMetrics().getCustomMetric(M_MAX_TIME_DIFFERENCE).setValue(curDifference);
            }

            getOperatorContext().getMetrics().getCustomMetric(M_LAST_TIME_DIFFERENCE).setValue(curDifference);

            getOperatorContext().getMetrics().getCustomMetric(M_AVG_TIME_DIFFERENCE)
                    .setValue(difference / skippedCount);
            MDC.put("id", getOperatorContext().getName());
            RESULT_LOG.info(
                    "Skipping: \"{}\" \n TWITTER CREATION TIME: {} ms, WINDOW TIME: {} ms \n EXPIRED TIME: {} sec WITH DELTA: {}",
                    new Object[] { text, createdAt.getTime(), lastPunctuation.getTime(), curDifference, delta });
            return;
        }

        if ((lastPearRefresh.add(new Timestamp(refreshRateSeconds, 0)).before(Timestamp.currentTime()))) {
            RESULT_LOG.trace("Attempting to refresh the UIMA pear file.");
            if (refreshResourceFile(processingEngineID)) {
                refreshAnalyzer(localWriteLocation, installFolder, parall, true);
            }
            lastPearRefresh = Timestamp.currentTime();
        }
        // Performing analysis
        AnalysisResult result = new AnalysisResult();
        try {
            analyzer.analyze(text, result);
        } catch (Exception e) {
            error("Error while analysis: %s", e.toString());
            RESULT_LOG.info("Error while analysis: %s", e.toString());
            return;
        }
        MDC.put("id", getOperatorContext().getName());
        incMetricValue(M_ANALYZED);
        // RESULT_LOG.info("--------------------");// 20 '-'

        if (!result.isEmpty()) {
            incMetricValue(M_FOUND);
        }
        // Producing output
        int numOutputs = getOperatorContext().getNumberOfStreamingOutputs();
        for (int i = 0; i < numOutputs; i++) {
            StreamingOutput<OutputTuple> output = getOutput(i);
            OutputTuple outTuple = output.newTuple();
            if (!result.isEmpty()) {
                outTuple.setMap(TA_RESULT, result.getSentiments());
                output.submit(outTuple);
            }
        }
        time = System.currentTimeMillis() - time;
        getOperatorContext().getMetrics().getCustomMetric(M_ANALYSIS_TIME).incrementValue(time);
        long anTime = getOperatorContext().getMetrics().getCustomMetric(M_ANALYSIS_TIME).getValue();
        long anNumb = getOperatorContext().getMetrics().getCustomMetric(M_ANALYZED).getValue();
        getOperatorContext().getMetrics().getCustomMetric(M_AVG_TIME)
                .setValue((long) Math.floor(anTime / (double) anNumb + 0.5));
    }

analysis ストリームのそれぞれが結合されて analysisResult ストリームになります。パンクチュエーションが検出されるまで、センチメントが集計されて一時ストレージに保管されます。パンクチュエーション・タプルが検出された時点で、AggregatedSentimentsCalc が PlayerCSVRetrieval Java カスタム・オペレーターに出力されます。PlayerCSVRetrieval は SAB ファイル内の players.csv とオブジェクト・ストレージ内の players.csv との間で MD5 ハッシュを比較します。player.csv ファイルのスニペットの例は、リスト 10 で確認することができます。2 つのハッシュが異なっている場合は、オブジェクト・ストレージ内の players.csv が優先されます。センチメント測定のそれぞれが、プレイヤー ID に関連付けられて Streams マップに入れられます。

リスト 10. テニス・プレイヤーの ATP (男子プロ・テニス協会) の ID と名前が含まれる player.csv ファイルのサンプル行
atpa479,nicolas almagro
atpa596,pablo andujar
atpa678,kevin anderson
atpb678,michael berrer

カスタム SPL オペレーターは次に、aggregatedSentiments、PlayerCSV マップ、そしてプレイヤー・マップを消去する必要がある場合にシグナルを送るタプルを受け取ります (リスト 11 を参照)。

リスト 11. 3 つの入力ストリームから 1 つの出力ストリームを作成する SPL の例
    stream<rstring ID,ustring playerName, int32 totMentWind, float64 posSentWindPercent,
            int32 totMentTour, float64 posSentTourPercent> playerIDEnrich = Custom(CleanPlayerMap;PlayerCSV;aggregatedSentiments) {

        logic state : {mutable map<rstring,rstring> PlayerMap; }

        onTuple CleanPlayerMap : {
            clearM(PlayerMap);
        }

        onTuple PlayerCSV : {
            insertM(PlayerMap, PlayerCSV.name, PlayerCSV.id);
        }
        
        
        onTuple aggregatedSentiments : {
            //lookup and submit
            rstring name = (rstring)aggregatedSentiments.playerName;
            printStringLn("Name: " +name) ;
            if (name in PlayerMap){
                 mutable rstring id = PlayerMap[name];            
                submit({ID=id,playerName=aggregatedSentiments.playerName,totMentWind=aggregatedSentiments.totMentWind,posSentWindPercent=aggregatedSentiments.posSentWindPercent,totMentTour=aggregatedSentiments.totMentTour,posSentTourPercent=aggregatedSentiments.posSentTourPercent},playerIDEnrich);
            }
            else{
                submit({ID="",playerName=aggregatedSentiments.playerName,totMentWind=aggregatedSentiments.totMentWind,posSentWindPercent=aggregatedSentiments.posSentWindPercent,totMentTour=aggregatedSentiments.totMentTour,posSentTourPercent=aggregatedSentiments.posSentTourPercent},playerIDEnrich);
                //appTrc(Sys.info,"No match in map for player name: " + name);
                appTrc(Trace.info,"No match in map for player name: " + name);
            }
        }
         onPunct aggregatedSentiments : {
                 submit(Sys.WindowMarker,playerIDEnrich);
             }
        
        }

playerIDEnrich ストリームは JSON タプルに変換されて JSON ストリームにサブミットされます。このJSONStream が複合ストリームに入力されます (リスト 12 を参照)。最終的な JSON データは、Swift API を使用した ObjectStorePersist という名前のカスタム Java オペレーターによってオブジェクト・ストレージに書き出されます。

リスト 12. JSONStream は、テキストをオブジェクト・ストレージに保管する作業を行う複合 SPL フローに入力されます
stream<rstring jsonData> JSONStream = TupleToJSON(JSONConverter as I){}
                                
        () as proc = ObjectStorePersistence(JSONStream as J) {
        param 
            //the tuple field that has json data
            storeText : "jsonData";
        }
namespace com.ibm.incubator.rs ;

use ObjectStorage.com.ibm.store::ObjectStorePersist ;

composite ObjectStorePersistence (input feed){

param
    // Filed in tuples from input feed, containing json text to process
    expression<rstring> $storeText ;
    expression<rstring> $objectStoreConfigPath : getThisToolkitDir() + "/ObjectStorage/properties/"+getSubmissionTimeValue("storeConfigFile","objectStore.dallas.properties") ;
    expression<rstring> $container : getSubmissionTimeValue("storeContainer","wwsm") ;
    expression<rstring> $objectName : getSubmissionTimeValue("storeObjectName","SocialSentiment.json") ;
    expression<rstring> $storeVersion : getSubmissionTimeValue("storeVersion","v1") ;
graph

 () as TupleStore = ObjectStorePersist(feed) {
    param 
        storeConfigFile : $objectStoreConfigPath;
        storeContainer : $container;
        storeObjectName : $objectName;
        jsonData: $storeText;
        storeVersion: $storeVersion;
}
}

リスト 13 に、JSON フォーマットの出力ソーシャル・センチメントを示します。

リスト 13. 最終的なソーシャル・センチメント JSON ファイルの例
{
  "players" : [ {
    "total" : 33841,
    "recentPositive" : 0.0,
    "name" : "yi fan xu",
    "id" : "wta312280",
    "totalPositive" : 80.0,
    "recent" : 10
  }, {
    "total" : 3790,
    "recentPositive" : 0.0,
    "name" : "jeremy chardy",
    "id" : "atpca12",
    "totalPositive" : 97.0,
    "recent" : 0
  }, {
    "total" : 85,
    "recentPositive" : 0.0,
    "name" : "malek jaziri",
    "id" : "atpj267",
    "totalPositive" : 100.0,
    "recent" : 0
  }, {
    "total" : 93,
    "recentPositive" : 0.0,
    "name" : "magda linette",
    "id" : "wta315130",
    "totalPositive" : 100.0,
    "recent" : 0
  }, {
    "total" : 364,
    "recentPositive" : 0.0,
    "name" : "ekaterina makarova",
    "id" : "wta311604",
    "totalPositive" : 88.0,
    "recent" : 0
}],
  "lastUpdate" : 1447272246065
}

出力のコンシューマーの 1 つは、図 3 に示すソーシャル・スコアボードです。

図 3. ソーシャル・センチメント JSON ファイルのコンシューマーがソーシャル・スコアボードを生成します
ソーシャル・スコアボードを示す図
ソーシャル・スコアボードを示す図

IBM Content Analytics

特定のプレイヤーに関するツイートのソーシャル・センチメント分析を行うには、広範な辞書ベースが必要です。IBM Content Analytics 内で作成される UIMA パイプライン構成は、ドキュメント言語の選択、字句解析、解析ルール、およびクリーンアップからなります。スポーツのセンチメント測定では、私たちがベースとして選択したのは英語です。字句解析の専門家が定義しているテキスト・トークン化のための分割ルールには、品詞のタグが付けられます。語彙辞書として、ドメイン固有のカスタム辞書が定義されています。

図 4. IBM Content Analytics のソーシャル・センチメント分析用 UIMA パイプライン構成
UIMA パイプライン構成を示す画面のスクリーン・キャプチャー
UIMA パイプライン構成を示す画面のスクリーン・キャプチャー

スポーツ・イベントごとに、英語と関連付けられている 2 つの辞書が作成されています。プレイヤーの情報は、en-TennisPlayers という名前のローカル・データベースにインポートされます。何種類かの異なる表示形式の名前が、通常の形式に追加されて、品詞 (この場合は、名詞) と関連付けられています。プレイヤー辞書のコンパイル時に、プレイヤー名のアノテーション詳細を指定する UIMA タイプのシステム辞書が生成されます。スポーツ・プレイヤーの名前の表示形式と一致するトークンのそれぞれに、DictTennisPlayers でアノテーションが付けられます。2 つ目の辞書の大半は、トークンにポジティブまたはネガティブのラベルを付ける形容詞の定義からなります。例えば、「OK」という単語は、O.K. の変化形と一緒にポジティブな形容詞として分類されます。トークンには、さらに triggerType という名前のフィーチャーが追加されます。トークンは複数のフィーチャーを持つことができます。単語の POS およびセグメンテーションをトークンに拡張するために、2 つの組み込みディレクトリーも追加で組み込まれます。

リスト 14. テニス・プレイヤー辞書用の UIMA タイプのシステム
<?xml version="1.0" encoding="utf-8"?>
    <typeSystemDescription xmlns="http://uima.apache.org/resourceSpecifier">
  <name>en-XX-TennisPlayers-ts</name>
  <description>en-XX-TennisPlayers</description>
  <vendor></vendor>
  <version></version>
  <types>
    <typeDescription>
      <name>com.ibm.usopen.DictTennisPlayers</name>
      <description>en-XX-TennisPlayers</description>
      <supertypeName>uima.tt.DictionaryEntryAnnotation</supertypeName>
    </typeDescription>
    </types>
</typeSystemDescription>

フィーチャーとアノテーションのストリームは、解析ルールのステージに入力されます。このステージには、2 つのルール・ベースのセマンティックな解釈が定義されています。

  • 「オピニオン・セマンティック分析」は、NegativeIndicator、PositiveIndicator、または NeutralIndicator を生成するために、いくつかのルールを提供しています。実際のアトミックなルール・アノテーターには、一連のトークン (ポジティブな triggerType の DictTrigger、またはネガティブな triggerType の DictTrigger) が含まれています。例えば、値 n't の後に 0 から 2 つのトークンが続き、ネガティブな triggerType であると結論付けられたりします。
  • 「集計」ルールは、NegativeIndicator と PositiveIndicator の間の関係を引き出します。例えば、2 つの PositiveIndicator の後に 1 つの NegativeIndicator が続いている場合、全体としてはポジティブな応答になります。一方、1 つの PositiveIndicator の後に 1 つの NegativeIndicator が続いている場合は、全体としては中立的な応答になります。

続いて、センテンスまたはツイートに対するアノテーションの累積は、TennisPlayerSentiment という名前の最終的なセンチメント・アノテーターに渡されます。4 つのルールが以前のアノテーションのさまざまな順序を検索し、これらのルールによって TennisPlayerPositiveSentiment または TennisPlayerNegativeSentiment が生成されます。

  • DictTennisPlayer[player token] の後に 0 から 7 つの他のトークンが続き、最後に PositiveIndicator[lemma word] が続く場合、結果は TennisPlayerPositiveSentiment になります。
  • PositiveIndicator[lemma word] の後に 0 から 3 つのトークンが続き、最後に DictTennisPlayer[player token] が続く場合、結果は TennisPlayerPositiveSentiment になります。
  • DictTennisPlayer[player token] の後に 0 から 7 つの他のトークンが続き、最後に NegativeIndicator[lemma word] が続く場合、結果は TennisPlayerNegativeSentiment になります。
  • NegativeIndicator[lemma word] の後に 0 から 3 つのトークンが続き、最後に DictTennisPlayer[player token] が続く場合、結果は TennisPlayerNegativeSentiment になります。

UIMA パイプラインは、語彙辞書と解析ルールのすべてと併せて、UIMA PEAR ファイルにエクスポートされます。この PEAR ファイルが、UIMA Java API を介して IBM InfoSphere Streams マシンにインストールされます。Streams から渡されたタプルは、カスタム UIMA オペレーター用に抽出され、このオペレーターでテキストが UIMA の CAS (Common Analysis Structure: 共通分析構造) の中にラップされてインストール済み UIMA PEAR ファイルに入力されます。すると、UIMA の CAS からアノテーション、TennisPlayerNegativeSentiment、または TennisPlayerPositiveSentiment の出力が抽出されて、ダウンストリーム処理のために Streams タプルに戻されます。

サーバー・アクセス・ログの集計

リアルタイムのログ・ストリームは、プレディクティブ・クラウドに対し、予期せぬトレンドの予測を調整する上で必要な、(私たちが分析対象としている) サイトのヒット・レートを提供します。Python スクリプトがすべての IBM HTTP Server アクセス・ログをモニターして、RabbitMQ にメッセージを送信します。コンシューマー側では、図 5 に示されているログ・アグリゲーター・ジョブが RabbitMQ からメッセージをプルします。リスト 15 に、rstring を使用して RabbitStream ストリームを生成するカスタマー RabbitSource オペレーターを示します。RabbitMQ カスタム・オペレーターがタプルを取り込んだり生成したりするには、その前に allPortsReady() メソッドが呼び出されます。このメソッドは RabbitMQ API を使用して、指定されたメッセージ・キューに接続します。RabbitMQ と ジョブ 4 との間のセキュアなメッセージ送信をサポートするには、あらかじめ鍵ストアとトラストをセットアップしなければなりません。RabbitStream は、SPL 内で定義されているすべての処理エンジンに使用可能になります。

RabbitStream のコンシューマーの 1 つは、ログの行を毎分集計して、処理された行数と時刻を提供します。LogLineStream ストリームへの出力となる時刻のタプル要素は、別のカスタム Java オペレーターによってタプルに変換されます。サーバー・アクセスの時刻と合計数が含まれる出力 JSON コンテンツは、DB2 に保管するために RESTful サービスに送信されます。それと並行して、RabbitStream がカスタム SPL ファンクターに入力されて、メッセージのタイム・スタンプが解析された後、タプルが集計されて GraphiteLogLine ストリームに送られます。UDPSink は処理された行数の合計を、UDP リクエストを受け取る時系列データベース Graphite に送信します。

図 5. ログ・アグリゲーターのストリーム計算ジョブの全体的な処理要素フロー
ストリーム・ジョブの全体的な処理要素フローを示す図
ストリーム・ジョブの全体的な処理要素フローを示す図
リスト 15. RabbitMQ に接続するカスタム・ストリーム・オペレーターの Java コード
public class RabbitSource extends AbstractOperator implements Operator {
    private final static Logger logger = Logger.getLogger(RabbitSource.class.getName());
    private Channel channel;
    private ExecutorService executor;
    private String queueName;
    private int port;
    private boolean durable = true;
    private boolean shutdown = false;
    private boolean subscription = true;
    private final LinkedList<String> lastMessages = new LinkedList<String>();

    @Override
    public void allPortsReady() throws Exception {
        connect();
    }

    public void connect() throws Exception {
        logger.info("Starting RabbitMQ Consumer");
        if (subscription) {
            logger.info("Creating Subscription Consumer");
            channel.basicQos(1000);
            channel.basicConsume(queueName, false, new SubscriptionConsumer());
        } else {
            Thread t = getOperatorContext().getThreadFactory().newThread(new PollingConsumer());
            t.start();
            logger.info("Creating Polling Consumer");
        }
        logger.info("Started RabbitMQ Consumer Successfully");    
} ….

Bluemix でのアーキテクチャーとデプロイメント

WWSM のソーシャル・スコアボードは、自然言語処理アルゴリズム、ストリーミング・テクノロジー、ツイートなどによって生成されたソーシャル・センチメント・データを JSON フォーマットで取り込みます。スポーツ・プレイヤーに対するソーシャル・センチメントを計算するプラットフォームは、Bluemix、SoftLayer、イベント・インフラストラクチャーが含まれるハイブリッド・クラウドに分散されています。このサービスは、2 つのアクティブ・サイトと 1 つのスタンバイ・サイトで継続的可用性が確保されています。Bluemix と SoftLayer の両方を対象とした 2 つのアクティブ・サイトは、ダラスとロンドンにあります。SoftLayer を対象としたスタンバイ・サイトはメルボルンにある一方、Bluemix を対象としたスタンバイ・サイトはシドニーとロンドンに分けられています。ソーシャル・センチメントをサポートするモニタリングとデプロイメントの側面は、地理的に分散されたプライベート・クラウド内で実行されます。

図 6. プライベート・クラウド、Bluemx、および SoftLayer で継続的可用性が確保されたソーシャル・センチメント・サービス
継続的可用性が確保されたソーシャル・センチメント・サービスを示す図
継続的可用性が確保されたソーシャル・センチメント・サービスを示す図

図 6 には、Bluemix、SoftLayer、プライベート・クラウドの各クラウド・プラットフォーム内で実行されているテクノロジーが示されています。イベント・インフラストラクチャーとコンテンツ・チームは独自のプライベート・クラウド・プラットフォームでソーシャル・センチメント・サービス・アプリケーションから提供されるコンテンツを管理し、使用しています。モニタリング用 JAR (Java アーカイブ) プログラムは、Java 1.8 仮想マシン内で実行されます。モニタリング・アプリケーションによって、以下の 3 種類の問題が検出されます。

  • ソーシャル・センチメントが構成可能な分数以内に更新されない場合、アラートがスローされます。
  • Bluemix Streams (Streaming Analytics) または SoftLayer のオブジェクト・ストレージの RESTful インターフェースが失敗した場合、アラートがスローされます。
  • Bluemix Streams が正常に動作していない場合、アラートがスローされます。

Streams アプリケーション・バンドルは、リスト 16 とリスト 17 の Ant と Maven の組み合わせによってビルドされて、適切な Bluemix 地域にデプロイされます (図 6 を参照)。「social1」という名前の Streams ジョブが Twitter に接続し、ツイートをストリーミング・アプリケーションのジョブ 0 (リスト 1 を参照) にプルします。social1 用には特定の SAB ファイルをビルドしてデプロイします。「social2」という名前の Streams ジョブは、ラテン文字フィルター、自然言語を解釈する UIMA PEAR ファイル、時間枠での集計、および処理並列化に適用されます。このジョブは、図 2 のジョブ 3 と同じような形で SoftLayer のオブジェクト・ストレージとインターフェースを取ります。2 番目の SAB ファイルは、ビルドされると Bluemix Streams にデプロイされます。social1 と social2 の 2 つのジョブは、SPL グラフによって互いに接続されます。イベントの開催期間中、ダラスとロンドンのアクティブ・サイトは、計画された停止も計画されていない停止も起きることなく、継続的に利用可能でなければなりません。これらのサイトのいずれかでサービス停止が予定された場合は、オーストラリアとロンドンのスタンバイ Bluemix サイトをアクティブにすることができます。

リスト 16. Maven プラグインによって Ant に接続する「social2」ジョブを作成する Maven ビルド・ファイル
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.ibm.incubator.rs</groupId>
    <artifactId>RacketStream</artifactId>
    <version>2.0.0</version>
    <name>RacketStream</name>
    <description>Project for RacketStream</description>
    <packaging>pom</packaging>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-antrun-plugin</artifactId>
                <version>1.8</version>
                <executions>
                    <execution>
                        <id>compile-spl</id>
                        <phase>install</phase>
                        <configuration>
                            <tasks>
                                <property name="compile_classpath" refid="maven.compile.classpath" />
                                <property name="outputDir" value="${project.build.outputDirectory}" />
                                <property name="sourceDir" value="${project.build.sourceDirectory}" />
                                <ant antfile="${basedir}/spl-compile.xml" target="all" />
                            </tasks>
                        </configuration>
                        <goals>
                            <goal>run</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <modules>
        <module>RacketStreamJava/</module>
    </modules>
</project>
リスト 17. 「social2」ジョブの作成をサポートする Ant ビルド・ファイル
<?xml version="1.0" encoding="UTF-8"?>
<!-- Example of using generating a single SPLDOC set from
   a number of toolkits. In this case all the InfoSphere Streams
   product toolkits, all toolkits under $HOME/toolkits and Vwap.
 -->
<project name="RacketStream" default="compile">

    <property environment="env"/>
    <property name="streams.install" value="${env.STREAMS_INSTALL}"/>
    <taskdef name="splcompile" classname="com.ibm.ssb.ant.sc.ScTask">
        <classpath>
            <pathelement location="ant/com.ibm.ssb.ant.jar"/>
        </classpath>
    </taskdef>

    <property name="mainComposite" value="application::LogAggregator"/>

    <!-- Remove the output directory created by the SPL compile -->
    <target name="clean">
        <delete dir="${basedir}/output" quiet="true"/>
        <delete dir="${basedir}/packaged" quiet="true"/>
        <delete file="${basedir}/${ant.project.name}.tar.gz" quiet="true"/>
        <delete file="${basedir}/${ant.project.name}.tar" quiet="true"/>
    </target>

    <!-- Compile the application, runs sc -->
    <target name="compile">
        <splcompile mainComposite="com.ibm.incubator.rs::GNIPRacketStream" />
    </target>

    <!-- Package the compiled application -->
    <target name="package">
        <mkdir dir="${basedir}/packaged"/>
        <copy todir="${basedir}/packaged/application">
            <fileset dir="${basedir}/output"/>
        </copy>
        
        <copy todir="${basedir}/packaged/data">
            <fileset dir="${basedir}/data"/>
        </copy>

        <package_operator operator-name="RacketStreamJava" package-name="com.ibm.incubator.rs"/>
        
        <mkdir dir="${basedir}/packaged/JSONHelpers"/>
        <copy todir="${basedir}/packaged/JSONHelpers">
            <fileset dir="${basedir}/JSONHelpers"/>
        </copy>
        
        <mkdir dir="${basedir}/packaged/ObjectStorage"/>
        <copy todir="${basedir}/packaged/ObjectStorage">
             <fileset dir="${basedir}/ObjectStorage"/>
        </copy>
        
        <copy todir="${basedir}/packaged/">
            <fileset dir="${basedir}/" includes="*.properties"/>
        </copy>
        
        <tar destfile="${basedir}/${ant.project.name}.tar" basedir="packaged/"/>
        <gzip destfile="${basedir}/${ant.project.name}.tar.gz" src="${basedir}/${ant.project.name}.tar"/>
    </target>
    
    <macrodef name="package_operator">
        <attribute name="operator-name" />
        <attribute name="package-name" />
        
        <sequential>
            <mkdir dir="${basedir}/packaged/@{operator-name}"/>
            <copy todir="${basedir}/packaged/@{package-name}">
                <fileset dir="${basedir}/@{package-name}"/>
            </copy>
            <copy todir="${basedir}/packaged/@{operator-name}/target">
                <fileset dir="${basedir}/@{operator-name}/target" includes="**/*.jar"/>
            </copy>
            <copy file="${basedir}/toolkit.xml" tofile="${basedir}/packaged/toolkit.xml"/>
            <copy file="${basedir}/info.xml" tofile="${basedir}/packaged/info.xml"/>
        </sequential>
    </macrodef>
    
    <target name="all" depends="clean,compile,package"></target>
</project>

SoftLayer クラウド・プラットフォームは、「オブジェクト・ストレージ」というデータ・ストレージ機能を提供しています。SoftLayer オブジェクト・ストレージの地理的位置は、パフォーマンス、アクセスのしやすさ、そして可用性を考慮して、Bluemix Streams の地理的位置に合わせて選択されています。Bluemix Streams アプリケーションによる定期的ダウンロードのために、オブジェクト・ストレージには、図 4 に示したロジック、辞書、センチメント定義が含まれる PEAR ファイルがアップロードされます。スポーツ・プレイヤーの名前と ID のコードが含まれるプレイヤーのカンマ区切り (CSV) ファイルも、Bluemix Streams アプリケーションよって、構成可能な間隔でダウンロードされます。Bluemix Streams アプリケーションの出力は、JSON ファイルの形式で SoftLayer オブジェクト・ストレージ・コンテナーにアップロードされます。

更新された sentiment.json ファイルをプルするために、オブジェクト・ストレージの JAR ファイルがコンテンツ・チームのプライベート・クラウド内で実行されます。プルされたファイルがソーシャル・スコアボード・アプリケーションによって解析されることで、図 3 に示すビューが生成されます。接続プロパティー・ファイルには、オブジェクト・ストレージに接続するためのパスワード、URL、コンテナー、ユーザー ID などが含まれています。オプションの暗号化 JAR ファイルを実行することで、すべてのパスワードを暗号化してディスクに保管することができます。

図 7. ソーシャル・センチメントの計算をサポートする、ハイブリッド・クラウドの機能フロー
クラウドの機能フローを示す図
クラウドの機能フローを示す図

イベント・インフラストラクチャー・プライベート・クラウドは、SoftLayer オブジェクト・ストレージのリージョンすべてにわたって負荷を分散するための GSLB (Global Server Load Balancer) を構成します。モニタリングを含め、オブジェクト・ストレージへのアクセスは常に GSLB 経由で行われます。GSLB は HTML エラー・コードも検出し、必要に応じてトラフィックを再ルーティングすることができます。この機能がさらに、継続的可用性を備えたサービスをサポートします。

まとめ

今回のチュートリアルでは、IBM がプロ・ゴルフ・トーナメントとプロ・テニス・トーナメントのプレゼンテーション用に導入したストリーミング・コンピューティング・アーキテクチャーの概要について説明しました。具体的には、IBM InfoSphere Streams と、Java 言語でサポートされた SPL (Streams Processing Language) のカスタマイズの使用法を説明しました。また、Streams の要素の処理における UIMA PEAR ファイルの実装と、この実装によってスポーツ・ソーシャル・スコアボードのアノテーションを提供する方法についても説明しました。そして開発者向けのサンプルとして、いくつかの例と具体的なコード・ブロックを示しました。

このシリーズの第 4 回では、保留状態のデータを Hadoop、IBM InfoSphere BigInsights、Web クローラーなどのテクノロジーを使用して処理する方法について説明します。


ダウンロード可能なリソース


関連トピック


コメント

コメントを登録するにはサインインあるいは登録してください。

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=ビジネス・アナリティクス
ArticleID=1032257
ArticleTitle=プロ・ゴルフとプロ・テニスのためのプレディクティブ・クラウド・コンピューティング: 第 3 回 処理中のビッグデータ
publish-date=06092016