IBM®
本文へジャンプ
    Japan [変更]    ご利用条件
 
 
検索範囲検索:    
    ホーム    製品    サービス & ソリューション    サポート & ダウンロード    マイアカウント    
skip to main content

developerWorks Japan  >  WebSphere  >

WebSphere Message Broker V6.1 における Collector ノードの使用

developerWorks
ページオプション

JavaScript を要するドキュメントオプションは表示されません

原文はこちら

原文はこちら


レベル: 中級

Andrew Coleman (andrew_coleman@uk.ibm.com), アドバイザリー・ソフトウェア・エンジニア、WebSphere Message Broker Developmentチーム, IBM

2008年 05月 15日

新しい WebSphere Message Broker V6.1 の Collector ノードは、さまざまな構成可能パラメーターに従ってメッセージのグループを収集し、さまざまなときにブローカーを通過するその他のメッセージを相互参照できる、新しいメッセージ処理シナリオを有効にします。この記事では、簡単なオンライン注文の例を使用して、この新しい Collector ノードの使用法を説明します。

概要

IBM® WebSphere® Message Broker は、あるエンドポイントから別のエンドポイントにメッセージを送付して変換することによって、アプリケーションを統合します。通常のプロセスは「1 つのメッセージを入力して、1 つのメッセージを出力」し、各メッセージ内の情報はフィルター (一部の情報を除去)、追加 (データベースなどから情報を追加)、または変換 (あるフォーマットから他のフォーマットに変換) することができます。大きなメッセージを小さなメッセージに分解するか、メッセージを多くのサブスクライブしているアプリケーションにパブリッシュすることにより、1 つの入力メッセージが処理されて、いくつかの出力メッセージが生成されることがあります。

WebSphere Message Broker V6.1 は、複数の入力メッセージを 1 つのメッセージ・コレクションにまとめて、そのコレクションを 1 つのエンティティーとして処理する汎用メカニズムを提供します。このため、メッセージ・コンシューマーが、複数のメッセージ・プロデューサーからの情報を必要とするメディエーションを作成できます。この論文では、簡単なオンライン注文のシナリオを使用してこの概念を紹介し、新しい Collector ノードを使用してこれらのメッセージ・コレクションを作成する方法について説明します。

単純なイベント・ドリブン・アプリケーション

メッセージ・コレクションの概念を説明するために、以下の単純化されたオンライン注文システムを考えます。2 つの部門が注文の取り扱いを管理します。

  • Admin -- WebSphere Message Broker を使用して注文と請求を取り扱います。この部門はクレジット・カード・データを取り扱い、信頼されていない人がこの情報にアクセスできないように個人情報を保守します。
  • Warehouse -- 項目、数量、メール・アドレスを含む注文を受け、注文をパッケージしてディスパッチします。ディスパッチされたときに、MQ メッセージが Admin 部門 (DELIVERY キュー) に送り返されます。この部門の従業員がクレジット・カード情報や電子メール・アドレスを見ることはありません。

図 1 Web 注文を処理するためのメッセージ・フロー
Web 注文を処理するためのメッセージ・フロー

以下のセクションでは、Web 注文を処理するメッセージ・フローについて説明し、Collector ノードを使用してさまざまな入力から得られるメッセージの処理を調整する方法を示します。

複数の入力

このメッセージ・フローは 2 つのソースから入力を受信します。

  • HTML 注文フォームとして Web ページから
  • Warehouseからのディスパッチ・メッセージを含む MQ キューから

メッセージ・フローは Web ページから注文を受信し、注文からデータを抽出し、データを Warehouseに送信します。Warehouse は受信してから 3 時間以内に注文をディスパッチし、ディスパッチ・メッセージをフローに送り返します (2 番目の入力)。メッセージ・フローはディスパッチ・メッセージと元の注文メッセージを関連付け、顧客への電子メール通知を生成します。注文してから 3 時間以内にディスパッチ・メッセージをWarehouse から受信しないと、メッセージを障害キューに配置することにより、サービス契約違反として遅延のフラグが立てられます。


図 2 フローへの最初の入力は HTML 注文フォームです
HTML 注文フォーム

メッセージは POST 要求として HTTPInput ノード (Web order) に送信され、BLOB パーサーを使用して JavaCompute ノード (Process order) にapplication/x-www-form-urlencoded ストリングとして渡されます。図 2 に示す入力データのストリングは以下のとおりです (改行を無視します)。

name=Andrew&address=IBM+Hursley+Park%2C+Winchester&email=andrew_coleman%40uk.ibm.com
&item=Power+Supply&quantity=1&creditcard=9999123412341234&expiry=09%2F09&Submit=Submit

以下の Java コードは、このメッセージをどのように処理して変換できるかを示します。

import java.util.*;
import com.ibm.broker.javacompute.MbJavaComputeNode;
import com.ibm.broker.plugin.*;

public class ProcessOrder extends MbJavaComputeNode
{
  private int orderNumber = 10000;

  public void evaluate(MbMessageAssembly inAssembly) throws MbException
  {
    MbOutputTerminal out = getOutputTerminal("out");
    MbOutputTerminal alt = getOutputTerminal("alternate");

    MbMessage inMessage = inAssembly.getMessage();

    MbElement blobElement = 
            inMessage.getRootElement().getFirstElementByPath("/BLOB/BLOB");
    String request = new String((byte[])blobElement.getValue());
    // extract order information
    Map<String, String=> queryTable = new Hashtable<String, String=>();
    String fields[] = request.split("&");
    for(String field : fields)  // iterate over the array
    {
      String keyValue[] = field.split("=");  // extract name/value pairs
      queryTable.put(keyValue[0], keyValue[1]);  // populate hash table
    }

    // now extract the information we need
    String item = queryTable.get("item");             // item
    String quantity = queryTable.get("quantity");     // quantity
    String name = queryTable.get("name");             // name
    String address = queryTable.get("address");       // address
    String email = queryTable.get("email");           // email
    String creditcard = queryTable.get("creditcard"); // credit card number
    String expiry = queryTable.get("expiry");         // credit card expiry date

    orderNumber++; // increment the order number

    MbMessage processMessage = new MbMessage();
    MbMessageAssembly processAssembly = new MbMessageAssembly(inAssembly, processMessage);
    MbElement processRoot = processMessage.getRootElement();
    copyMessageHeaders(inMessage, processMessage);

    // copy the request identifier into the reply identifier (required for HTTPReply node)
    MbElement localEnv = inAssembly.getLocalEnvironment().getRootElement();
    MbElement requestId = 
            localEnv.getFirstElementByPath("/Destination/HTTP/RequestIdentifier");
    processRoot.getFirstElementByPath("/Properties/ReplyIdentifier")
            .setValue(requestId.getValue());

    // build the 'admin' message
    MbElement body = processRoot.createElementAsLastChild(MbXMLNSC.PARSER_NAME);
    MbElement doc = body.createElementAsLastChild(MbXMLNSC.FOLDER, "order", null);
    doc.createElementAsLastChild(MbXMLNSC.FIELD, "orderID", orderNumber);
    doc.createElementAsLastChild(MbXMLNSC.FIELD, "item", item);
    doc.createElementAsLastChild(MbXMLNSC.FIELD, "quantity", quantity);
    doc.createElementAsLastChild(MbXMLNSC.FIELD, "name", name);
    doc.createElementAsLastChild(MbXMLNSC.FIELD, "address", address);
    doc.createElementAsLastChild(MbXMLNSC.FIELD, "email", email);
    doc.createElementAsLastChild(MbXMLNSC.FIELD, "creditcard", creditcard);
    doc.createElementAsLastChild(MbXMLNSC.FIELD, "expiry", expiry);

    out.propagate(processAssembly);

    // build the 'warehouse' message (restricted information)
    MbMessage warehouseMessage = new MbMessage();
    MbMessageAssembly warehouseAssembly = 
            new MbMessageAssembly(inAssembly, warehouseMessage);
   MbElement warehouseRoot = warehouseMessage.getRootElement();
    // copy properties header
    warehouseRoot.addAsFirstChild(inMessage.getRootElement().getFirstChild().copy());
    body = warehouseRoot.createElementAsLastChild(MbXMLNSC.PARSER_NAME);
    doc = body.createElementAsLastChild(MbXMLNSC.FOLDER, "order", null);
    doc.createElementAsLastChild(MbXMLNSC.FIELD, "orderID", orderNumber);
    doc.createElementAsLastChild(MbXMLNSC.FIELD, "item", item);
    doc.createElementAsLastChild(MbXMLNSC.FIELD, "quantity", quantity);
    String mailingLabel = name + "\n" + address;
    doc.createElementAsLastChild(MbXMLNSC.FIELD, "mailingLabel", mailingLabel);

    alt.propagate(warehouseAssembly);
  }

  public void copyMessageHeaders(MbMessage inMessage, MbMessage outMessage)
  throws MbException
  {
    MbElement outRoot = outMessage.getRootElement();

//  iterate though the headers starting with the first child of the root element
    MbElement header = inMessage.getRootElement().getFirstChild();
    while (header != null && header.getNextSibling() != null) 
            // stop before the last child (body)
    {
      // copy the header and add it to the out message
      outRoot.addAsLastChild(header.copy());
      // move along to next header
      header = header.getNextSibling();
    }
  }

}

JavaCompute ノードは、この URLEncoded ストリングを別々の名前/値ペアに分割して、ハッシュ・テーブルにデータを設定します。次に 2 つのメッセージを作成します。

  • 最初のメッセージは Web ブラウザーに HTML 応答を生成するために使用されて、Collector ノードにも渡されます。
  • 2 番目のメッセージはウェアハウスに送信され、クレジット・カード・データや電子メール・アドレスを除く、注文をディスパッチするのに必要な情報のみが含まれます。

最初の応答

POST 要求を送信した Web ページはタイムリーな応答を期待します。通常、この応答は XSLT を使用して着信 XML メッセージから生成できる HTML 形式です。以下のスタイルシートで応答をフォーマットします。これは XSLT ノードによって実行されます (Generate HTML)。スタイルシートからの出力は、Web ブラウザーに送信するために HTTPReply ノードに伝搬されます (Order confirmation)。

<?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?=>
<xsl:stylesheet xmlns:xsl=&quot;http://www.w3.org/1999/XSL/Transform&quot;
    version=&quot;1.0&quot;
    xmlns:xalan=&quot;http://xml.apache.org/xslt&quot;=>
<xsl:output method=&quot;html&quot;/=>

  <xsl:template match=&quot;/&quot;=>
    <html=>
      <head=>
        <title=>Order confirmation</title=>
      </head=>
      <body=>
        <p=>
          Dear <xsl:value-of select=&quot;/order/name&quot; /=>,
        </p=>
        <b=>Thank you for your order.</b=>
        <p=>
          It has been sent to our warehouse and will be
          dispatched within 3 hours. You will receive an email
          when it is dispatched.
        </p=>
        <p=>
          Order reference number:
          <xsl:value-of select=&quot;/order/orderID&quot; /=>
        </p=>
      </body=>
    </html=>
  </xsl:template=>

</xsl:stylesheet=>

注文およびディスパッチ・メッセージの収集と関連付け

Warehouse がディスパッチ・メッセージを生成すると、電子メールを顧客に送信しなければなりません。顧客の電子メール・アドレスは、すでにメッセージ・フローを通過した元の注文メッセージの一部でしたが、Warehouse に送信された MQ メッセージには含まれていませんでした。イベント処理システムでは、イベントがその他のイベントのコンテキストでのみ意味を持つ状況がよくあります。この場合、元の注文メッセージにアクセスしながらディスパッチ・メッセージを処理する必要があります。Collector ノードは以下の場合に役立ちます。ノードは 2 つの動的な入力ターミナルで構成されます。図 3 に示すように、1 つは元の注文メッセージを受信し、もう 1 つは Warehouse からディスパッチ・メッセージを受信します。


図 3 ノードの右クリックによる、Collector ノード上の動的な入力ターミナルの作成
ノードの右クリックによる、Collector ノード上の動的な入力ターミナルの作成

ノード・プロパティーは、これらのメッセージがどのようにグループ化されるかを定義します。各入力ターミナルの数量プロパティーは 1 に設定されます (詳細については、図 4 を参照してください)。数量を設定すると、その数量に達するまでコレクション・メッセージは伝搬されません。この場合、それは各タイプの 1 つのメッセージの数量です。また、共通の orderID に従って着信メッセージを関連付ける必要があります。各ターミナルの Correlation path (相関パス)プロパティーは、orderID フィールドを含むそれぞれのメッセージ内のロケーションに設定されます。ロケーションを決定するための表現は、XPath 1.0 構文を使用して表わされます。


図 4 Collector ノード・プロパティー:各入力ターミナルに対してテーブルに 1 行が作成されます
Collector ノード・プロパティー

Collector ノードによって生成されたコレクション・メッセージは、以下の図 5 に示す構造を持ちます。メッセージ本文は Collection という名前のフォルダーであり、CollectionName という名前の属性を持ち、その値は、Collector ノードのプロパティー、Collection name で指定されます。コレクションの各メッセージは、メッセージが到着した入力ターミナルにちなんで名前が付けられたフォルダーの下にサブツリーとして格納されます。


図 5 コレクション・メッセージの構造
コレクション・メッセージの構造

メッセージのグループ化およびバッチ処理

Collector ノードはメッセージ・コレクションを定義するための柔軟性のあるメカニズムで、メッセージをグループ化するための 2 つの共通パターンを備えています。

  • Grouping -- 各入力メッセージ・タイプに対して、新しい動的な入力ターミナルを Collector ノードに追加することにより、さまざまなソースからのさまざまなフォーマットのメッセージを同じコレクションに組み合わせることができます。この記事に示すシナリオにはこの手法を使用します。通常、次のセクションに説明するように、メッセージ・コンテンツの特定の部分をマッチングすることにより、メッセージを関連付ける必要があります。
  • Batching -- 単一入力ターミナルから複数のメッセージを収集して、より大きなバッチ処理メッセージを作成することで、単一ソースからのコレクションを作成できます。これは、大きなメッセージが小さなコンポーネント・メッセージに分割されるメッセージ分解の逆です。Collector ノード上の 2 つのパラメーター (入力ターミナルごと) によって、メッセージのバッチ処理が制御されます。
    • Quantity (数量) -- この入力ターミナルに到着するメッセージのいくつを各コレクションが受け入れるかを設定します。ゼロ (または設定解除) は無限を意味します (Timeout が有限である場合にのみ無限にできます)。デフォルト値は 1 です。
    • Timeout (タイムアウト) -- 最初のメッセージが到着してから、入力ターミナルがメッセージを受け入れる最大時間 (秒)。ゼロ (または設定解除) は無限を意味します (Quantity が有限である場合にのみ無限にできます)。Quantity および Timeout のいずれも有限であれば、これら 2 つの条件のうちの最初の条件が満たされると、イベント・ハンドラーが満足されます。デフォルト値は設定解除です (無限/タイムアウトなし)。

1 つのコレクション定義に、グループ化とバッチ処理手法の両方を組み合わせることができます。後で伝搬するために、完全なコレクションをバッチ処理することもできます。着信メッセージを受け取る動的な入力ターミナル以外に、Control という静的な入力ターミナルもあります。その目的は、外部リソースが Collector ノードから出力をトリガーできるようにすることです。詳細は、Event coordination プロパティー設定によって制御されます。

  • Disabled -- Control ターミナルへのメッセージは無視され、コレクションは完了したときに伝搬されます。これはデフォルト設定です。
  • All complete collections -- コレクションが完了すると、キューに保持されます。メッセージが Control ターミナルで受信されると、このキューのすべてのコレクションが出力ターミナルに伝搬されます。
  • First complete collection -- コレクションが完了すると、キューに保持されます。メッセージが Control ターミナルで受信されると、このキューの最初のコレクションが出力ターミナルに伝搬されます。この制御メッセージが到着したときにキューが空であれば、Collector ノードが保留状態になって次の完全なコレクションが使用可能になると直ちに出力ターミナルに伝搬されます。

Control ターミナルで受信したメッセージのコンテンツは検査されず、このメッセージは受信時に廃棄されます。Collection expire (コレクション有効期限)が切れた不完全なコレクションは、Event coordination プロパティーの設定にかかわらず Expire ターミナルに直ちに伝搬されます (『コレクションの有効期限』のセクションを参照してください)。

相関ストリングおよびワイルドカード

Collector ノードを最大限に活用するには、メッセージ相関を理解することが重要です。メッセージ相関がなければ、Collector ノードは単にメッセージを到着順にグループ化します。これが適したアプリケーションもありますが、多くのイベント・ドリブン・システムでは、メッセージ・ヘッダーのコンテンツまたはプロパティーに従ってメッセージをグループ化する必要があります。Collector ノードは、コレクション内のすべてのメッセージが同じ相関ストリングを、必ず持つようにします。相関ストリングは、相関パスと相関パターンという 2 つのプロパティーから計算されます。これらのプロパティーは動的な入力ターミナルごとに繰り返され、異なるフォーマットのメッセージをコレクションにグループ化できます。

相関パスは標準 XPath 1.0 表現であり、着信メッセージに対して評価され、ストリングが生成されます。ストリングは、単純にロケーション・パスが指すエレメントのストリング値にもできますが、数値とストリング演算子を含むさらに複雑な式を XPath で作成できます。相関パターンが構成されていれば、結果のストリングが相関パターンと比較されます。相関パターンはワイルドカード文字 * を 1 つだけ含まなければなりません。そのワイルドカードと一致するストリングの部分が、最終的な相関ストリングになります。例えば、相関パスがファイル名 data1.txt を含むメッセージのフィールドを指し、相関パターンが *.txt に設定された場合、相関ストリングは data1 になります。XPath 自体がストリングを操作する機能を持っていますが、このメカニズムはメッセージ・エレメントからサブストリングを抽出する簡単な方法です。

この相関ストリング * の値は、ノードの Collection name プロパティーで参照できます。このプロパティーの値は、CollectionName という属性の各コレクション・メッセージに付加されます。上記の例は、コレクション名 order-* がランタイム時にどのように order-10002 に展開されるかを示します。ワイルドカードの値は、LocalEnvironment の Wildcard フォルダー内の WildcardMatch というフィールドに書き込まれます。

コレクション・メッセージの処理

メッセージ・コレクションにはシリアライズされるフォームがないため、メッセージ・コレクションを直接出力ターミナルに伝搬することはできません。コンポーネント・メッセージは 2 つ以上のパーサーが所有できます (1 つは XML、もう 1 つはバイナリーにできます)。このため、コレクション・メッセージは、最初に ESQL または Java を使用して Compute ノードで変換しなければなりません。この例では、コレクションの両方の部分に含まれている情報を抽出し、顧客への電子メールを生成します。以下のコード断片は、コレクションから値を抽出するために Xpath を使用して JavaCompute ノード (Generate delivery note) でそれを行う方法を示します。

MbMessage outMessage = new MbMessage();
MbMessageAssembly outAssembly = new MbMessageAssembly(inAssembly, outMessage);
MbElement outRoot = outMessage.getRootElement();

// extract required info from the collection
String orderNumber = 
 (String)inMessage.evaluateXPath(&quot;string(/delivery/XMLNSC/dispatch/orderID)&quot;);
String name = 
 (String)inMessage.evaluateXPath(&quot;string(/order/XMLNSC/order/name)&quot;);
String email = 
 (String)inMessage.evaluateXPath(&quot;string(/order/XMLNSC/order/email)&quot;);
String dispatchTime = 
 (String)inMessage.evaluateXPath(&quot;string(/delivery/XMLNSC/dispatch/timestamp)&quot;);

// create email header and setup To: and Subject: fields
MbElement header = outRoot.createElementAsLastChild(&quot;EMAILHDR&quot;);
header.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, &quot;To&quot;, 
        URLDecoder.decode(email, &quot;UTF-8&quot;));
header.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, &quot;Subject&quot;,
        &quot;Order &quot; + orderNumber + &quot; has been dispatched&quot;);

String content = &quot;Dear &quot; + name + &quot;,\n\nYour order was dispatched at &quot;
        + dispatchTime + &quot;.  The order number is &quot; + orderNumber;

// the body of the email is written as a BLOB
outRoot.createElementAsLastChildFromBitstream(content.getBytes(), MbBLOB.PARSER_NAME,
        null, null, null, 0, 0, 0);

メッセージは、指定された SMTP サーバーを介して送信するために、最終的に EmailOutput ノード (Notify customer) に伝搬されます。

コレクションの有効期限

メッセージを保持してコレクションが完了するまで待つことの問題は、必要なコンポーネント・メッセージの 1 つがタイムリーに到着しない可能性があることです。Collector ノードは、この事態が起こらないように保護するメカニズムを Collection expiry (コレクション有効期限)プロパティーの形で備えています。構成された場合、部分的にしか作成されていないコレクションは、このプロパティーに指定した時間を超えて保持されません。有効期限が切れると、それまでに受信したメッセージはメッセージ・コレクションに組み込まれ、Collector ノードの Expire ターミナルに伝搬されます。

このシナリオでは、この有効期限メカニズムを活用します。Warehouseはそれぞれの注文をディスパッチし、受信から 3 時間以内にメッセージを Admin メッセージ・フローに送り返します。Collector ノードの Collection expiry プロパティーを 3 時間 (10800 秒) に設定することにより、この条件を満たさない不完全なコレクションは Expiry ターミナルに送信されます。コレクションは、特別に取り扱うためのメッセージ・フローに送付できます。以下の Java コードは、Collector ノードによって生成された LocalEnvironment の WildcardMatch フィールドを抽出し、FAILURE MQ キューに入れるためのエラー・メッセージを生成します。

MbMessage outMessage = new MbMessage();
MbMessageAssembly outAssembly = new MbMessageAssembly(inAssembly, outMessage);
MbElement outRoot = outMessage.getRootElement();

// incomplete collection - extract the orderID from the WildcardMatch field
String orderNumber = (String)inAssembly.getLocalEnvironment().getRootElement()
                          .evaluateXPath(&quot;string(Wildcard/WildcardMatch)&quot;);
String content = &quot;Order &quot; + orderNumber + 
        &quot; was not dispatched within agreed time limit&quot;;
outRoot.createElementAsLastChildFromBitstream(content.getBytes(), MbBLOB.PARSER_NAME,
                                              null, null, null, 0, 0, 0);

結論

この記事では Web ベースの注文システムを例として使用して、WebSphere Message Broker V6 の Collector ノードの概要を示し、メッセージ・フローがさまざまなときにさまざまなコンテキストでフローを通過するメッセージを相互参照できるように、Collector ノードを構成する方法について説明しました。この手法は、意味のある情報を抽出するためにメッセージの相関を必要とする、イベント・ドリブン処理システムの基礎を形成します。この記事では、カウントまたは時間に基づくグループ化またはバッチ処理でのマッチング、各メッセージ・ソースで異なる相関パスと表現の指定など、Collector ノードの追加機能も説明しました。

Collector ノードは、一連のトランスポートにわたってイベント処理をサポートする包括的な機能セットを含みます。したがって、単一イベントではなく一連のイベントによって制御されるような処理で、 WebSphere Message Broker による、より複雑な新しい処理シナリオを作成できます。到着時間だけでなく、メッセージ・コンテンツに基づいてメッセージをグループ化する機能をこれに追加して、強力なメッセージ処理機能を自由に活用できます。



参考文献



著者について

Andrew Coleman は、英国にある IBM Hursley Software Lab の WebSphere Message Broker Development チームでアドバイザリー・ソフトウェア・エンジニアを務めています。Coleman は、Java UDN API、JavaCompute ノード、Collector ノードなど、ブローカー開発の重要な分野を 8 年間担当してきました。




記事の評価


サイト改善のため、ご意見をお寄せください。こちらのフォームからお願いいたします。



 


 


不充分・不完全である大変素晴らしい
 


この記事を共有する

del.icio.us del.icio.us newsing newsing FC2ブックマーク FC2ブックマーク
Choix! Choix! ニフティクリップ ニフティクリップ Yahoo!ブックマーク Yahoo!ブックマーク
MM/memo MM/memo CZブックマーク CZブックマーク livedoorクリップ livedoorクリップ
はてなブックマーク はてなブックマーク Buzzurl(バザール) Buzzurl(バザール)




上に戻る


    日本IBMについて プライバシー お問い合わせ