IBM Streams 4.3.0

オペレーターの実装

オペレーターは、 SPL ランタイム・クラス SPL::Operator を継承する C++ クラスとして実装されます。

オペレーター・ロジックの実装に一般的に使用される重要なメンバー関数には、以下の関数があります。
  • コンストラクターおよびデストラクター
  • ポート作動可能通知: allPortsReady
  • タプル処理: process(Tuple)
  • パンクチュエーション処理: process(Punctuation)
  • 終了通知: prepareToShutdown
図 1 に、これらのメンバー関数の呼び出し順序を示します。
図 1. 呼び出し順序
この図については、前後のテキストで説明されています。

オペレーター・インスタンスは、コンストラクターの呼び出しによってランタイム・ライフサイクルを開始します。オペレーターは、ポートの準備ができるまで、 タプルおよびパンクチュエーションを受信も送信もできません。オペレーターのポートの準備が整うと、オペレーターは、タプル処理関数およびパンクチュエーション処理関数の呼び出しを受け取り始め、適宜タプルをサブミットできます。allPortsReady 関数は、タプルの受信およびサブミットのためのポートの準備ができたことを示す、オペレーターへの通知として呼び出されます。 オペレーターのタプル処理関数およびパンクチュエーション処理関数の呼び出しは、allPortsReady 通知を受け取る前から可能です。タプル・サブミットが、タプルまたはパンクチュエーション処理関数のコンテキストの外で実行される場合は、allPortsReady 通知をまず受け取る必要があります。 オペレーターをホストする処理要素 (PE) がシャットダウンされようとしている場合、オペレーターは prepareToShutdown 関数の呼び出しを受け取ります。 これは、タプル処理関数およびパンクチュエーション処理関数がアクティブである間に発生することがあります。最終的には、デストラクターが呼び出され、オペレーターが割り振ったリソースをすべて解放します。

タプルおよびパンクチュエーションのサブミットと処理のガイドライン

  • タプル/パンクチュエーションを受け取る前に、すべてのオペレーターを構成します。
  • コンストラクターからタプルまたはパンクチュエーションをサブミットすることはできません。これを行うと C++ 例外が起こるためです。
  • allPortsReady 関数が呼び出されたら、タプルおよびパンクチュエーションのサブミットを開始できます。
  • allPortsReady 関数が呼び出される前にタプルおよびパンクチュエーションを受け取ることがあります。
  • 入力ポートで最終パンクチュエーションが受け取られると、以後その入力ポートからタプルまたはパンクチュエーションが受け取られることはありません。
  • 出力ポートで最終パンクチュエーションがサブミットされると、以後そのポートにサブミットされる後続のすべてのタプルおよびパンクチュエーションは、サイレントに破棄されます。

入力タプルの処理

タプルおよびパンクチュエーションを処理するためにオペレーターに実装する処理関数は、以下の 3 つの中から選択できます。
  • void process(Tuple const & tuple, uint32_t port) は、非ミュータブル入力ポート用のタプル処理関数です。tuple パラメーターは、 インデックス port の入力ポートに受信されるタプルを表します。ポートのインデックス付けは 0 から始まります。
  • void process(Tuple & tuple, uint32_t port) は、ミュータブルな入力ポート用のタプル処理関数です。
  • void process(Punctuation const & punct, uint32_t port) は、すべての入力ポート用のパンクチュエーション処理関数です。punct パラメーターは、 インデックス port の入力ポートに受信されるパンクチュエーションを表します。

入力ポートは、オペレーター・モデルによる指定に従い、ミュータブルか非ミュータブルのいずれかです。 オペレーター・モデルでミュータブル と宣言された入力ポートの場合、 オペレーター開発者は、そのポートに関するロジックを追加する際に、非定数タプル参照を受け取る process 関数を実装する必要があります。その実装の一部としてタプルを変更することは許可されます。オペレーター・モデルで非ミュータブル と宣言された入力ポートの場合、 オペレーター開発者は、そのポートに関するロジックを追加する際に、定数タプル参照を受け取る process 関数を実装する必要があります。その実装の一部としてタプルを変更することは許可されません。 一般的には、1 つのオペレーターでは、この 2 つのタプル処理関数のうちどちらか一方のみを実装します。ただし、一般的なケースで、1 つのオペレーターがミュータブルと非ミュータブルの両方の入力ポートを持つことは可能であり、そのような場合には両方の関数を実装することが必要になります。

パラメーターとして process 関数に引き渡されたタプルは、その process 関数呼び出しの存続期間中使用できます。そのタプルへのポインターまたは参照を、現行の処理 (process) 呼び出し以外のコンテキストで使用するために保管しないでください。 処理呼び出しから受け取られたタプルは、その処理呼び出しのコンテキスト内でサブミット呼び出しが実行される場合は、安全にサブミットできます。 タプルをオペレーター状態の一部として保管し、複数の処理呼び出しにわたって利用できるようにする場合は、コピーを作成してください。

ステートフル・オペレーターと並行性

異なる入力ポートの処理関数を、 マルチスレッド実行によってランタイム・システムから並行して呼び出すことができます。また、 ファンイン (1 つのポートに接続された複数のストリーム) がある入力ポートの場合は、 その入力ポートの処理関数を複数のストリームに並行して呼び出すことができます。一般的に、入力ポートに関連付けられている処理関数からアクセスされる 状態を、オペレーターが維持する場合は、その状態を同時アクセスから保護して、 実行の正確性を保証する必要があります。 ウィンドウがあるなら、状態があるとみなされます。 SPL には、クリティカル・セクションの作成に役立つ 3 つのユーティリティー・クラス、すなわち、MutexAutoMutex、および AutoPortMutex があります。 Mutex は、pthread mutex の機能のラッパー・クラスです。 AutoMutex は、クリティカル・セクションを Mutex オブジェクトから作成するクラスです。これは、作成されるときに mutex をロックし、 スコープの外に出るとき (すなわちオブジェクトが破棄されるとき) にロックを解除します。AutoPortMutexAutoMutex に似ていますが、これは、SPL ランタイムが、オペレーターの処理関数の同時呼び出しは不可能であると認識している場合に、実行されない分岐を減らすことができます。オペレーターの入力処理関数が並行して呼び出されるかどうかは、 使用されるオペレーター融合構成、上流オペレーターのプロパティー、転送オプション、およびスレッド化ポートの存在に応じて異なります。As a result, at operator development time, it is not possible to tell whether the operator's input process functions are called concurrently or not. It is important that the developer always protects the state against concurrent accesses. The SPL run time makes sure that the locking and unlocking cost is not paid when no concurrency is involved. For this optimization to take place seamlessly, use the AutoPortMutex class. An example use is as follows:

class MY_OPERATOR ... {
  ...  
private:
  Mutex mutex_;
};

void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {
  ...
  { // mutex_.lock() is called upon entry if needed
    AutoPortMutex am(mutex_, *this);
    ... // access state
  } // mutex_.unlock() is called upon exit if needed
  ...
}

ソース・オペレーターとオペレーター・スレッド

ここまでは、 入力ポートに関連付けられた処理関数の呼び出しによってトリガーされるオペレーター・ロジックの実装について説明してきました。ところが、source オペレーターには入力ポートがありません。また、着信タプルによってトリガーされない処理を実行するのに適したオペレーターもあります。このような目的のために、SPL のオペレーター API には、入力ポートに関連付けない追加の process 関数が用意されています。これを、非入力 処理関数と呼びます。

void process(uint32_t idx) は、自身のスレッド内で実行される処理関数です。idx パラメーターは、このスレッドのインデックスであり、オペレーターのローカルなインデックスです。

非入力処理関数のインデックス引数ということから判るように、実行するスレッドは複数存在することができます。これらのスレッドは、 uint32_t createThreads(uint32_t num) 関数の呼び出しによって作成されます。この関数は、num 個のスレッドを作成します。これらのスレッドを、オペレーター・スレッド と呼びます。オペレーター・スレッドは、非入力処理関数を一度呼び出し、 そのローカルなスレッド・インデックスを受け渡します。

この機能を使用して、ソース・オペレーターを以下のように実装できます。

void MY_OPERATOR::allPortsReady() {
  createThreads(1);
}

void MY_OPERATOR::process(uint32_t) {
  while(!getPE().getShutdownRequested()) {
    ...
  }
}

この例では、createThreads 関数が allPortsReady 関数の内部から呼び出されています。 これは必ず必要というのではありませんが、良い使用方法です。非入力処理関数に、タプルをサブミットするコードが含まれている場合 (これは非常に一般的です) は、スレッドの作成を遅延させることで、オペレーターへの接続がセットアップされる前にタプルがサブミットされることを防止できます。allPortsReady が呼び出される前にタプルをサブミットするのは無効です。 注目すべき別の重要な点は、getPE().getShutdownRequested() を使用して、このオペレーターを管理する処理要素のシャットダウン状況を確認しているという点です。 非入力処理関数が PE のシャットダウン時に戻らないと、オペレーターおよび PE が必要に応じて強制終了されますが、これは、オペレーターで指定されているシャットダウン処理が実行されないため、望ましくありません。 詳細は、ブロッキング処理とシャットダウン処理を参照してください。

追加のオペレーター・スレッドを使用して、マルチスレッド・オペレーターを 作成できます。オペレーター・ロジックで複数のスレッドからタプルをサブミットする必要がある場合、 オペレーターのスレッド化の動作をオペレーター・モデルに指定する際には注意が必要です。

SPL 言語ランタイムは、ソース・オペレーターの全スレッドの実行が終了した時に、ソース・オペレーターの実行を完了したと見なします。ソース・オペレーターにはタプルまたはパンクチュエーション処理関数がないため、 オペレーター・スレッドの完了が、オペレーターの実行終了を 意味します。言語ランタイムは、ソース・オペレーターの実行完了時に、すべての出力ポートで最終パンクチュエーションが送信されるようにします。 最終パンクチュエーションについて詳しくは、パンクチュエーション処理を参照してください。ランタイムは、 SPL ランタイム API を使用して作成されたスレッドのみを追跡します。オペレーターが、サード・パーティーのライブラリーで作成されたような他のスレッドに依存している場合は、SPL オペレーター・スレッドを作成して、その外部スレッドに使用させる必要があります。そうしないと、ランタイムは出力ポートで最終パンクチュエーションを早期に送信してしまいます。

同じように、非ソース・オペレーターは、すべてのオペレーター・スレッドが終了し、最終パンクチュエーションがすべての入力ポートで受信され、全部処理されると、完了したと見なされます。 一般的なケースでは、最終パンクチュエーションは、SPL 言語ランタイムによって自動的に入力ポートから出力ポートに転送されます。ただし、オペレーター・モデルで最終パンクチュエーションの自動転送がオフになっている場合は、オペレーター開発者の責任で、最終パンクチュエーションをサブミットする必要があります。ランタイムは、完了した非ソース・オペレーターの出力ポートでは最終パンクチュエーションをサブミットしません。

タプルおよびパンクチュエーションのサブミット

サブミット関数を使用して、タプルおよびパンクチュエーションをオペレーターの出力ポートにサブミットできます。タプル用のサブミット関数には以下の 2 つがあります。
  • void submit(Tuple const & tuple, uint32_t port): tuple パラメーターは、インデックス port の出力ポートからサブミットされるタプルを表します (インデックスは 0 から始まる)。
  • void submit(Tuple & tuple, uint32_t port) は、1 番目の関数に似ていますが、受け渡されたタプルを、このサブミット呼び出しの結果として変更することができます。

非ミュータブル出力ポートの場合、両方のサブミット関数が有効で、これらは機能的に同等です。 ミュータブル・ポートの場合は、非定数バージョンのみ有効です。 両方のバージョンとも、指定されたポートのタプル型と同じではない具象型のタプルが受け渡されると、結果としてランタイム・エラーになります。

オペレーター・モデルでミュータブルと宣言された出力ポートの場合、 オペレーター開発者は、そのポートでタプルをサブミットする際に、非定数タプル参照を受け取る submit 関数を使用する必要があります。 サブミット呼び出しがその処理の一部としてタプルを変更すると想定されます。オペレーター・モデルで非ミュータブルと宣言された出力ポートの場合、 オペレーター開発者は、サブミット呼び出しがその処理の一部としてタプルを変更しないことを保証されます。 したがって、どちらのサブミット関数でも使用できます。サブミット呼び出しに使用される submit 関数は、 C++ 言語の関数解決規則によって定義されます。以下に、そのさまざまなケースを示します。

表 1. サブミット呼び出しの有効性
ポートのミュータビリティー タプル 呼び出しサイト C++ 解決 有効性
ミュータブル 非定数 (例: Tuple & t =... ) submit(t, 0) 非定数 submit 有効
ミュータブル 定数 (例: Tuple const & t =...) submit(t, 0) 定数 submit 無効
非ミュータブル 非定数 (例: Tuple & t =... ) submit(t, 0) 非定数 submit 有効
非ミュータブル 定数 (例: Tuple const & t =...) submit(t, 0) 定数 submit 有効

表 1 で示されているように、 定数タプルをミュータブル・ポートにサブミットする場合は細心の注意を払う必要があります。これは無効であり、SPL 言語ランタイムは例外をスローして、 ランタイム・エラーとなります。SPL コンパイラーは、そのオペレーターに非ミュータブル出力ポートがない場合、 生成されるオペレーター・コード内で非定数サブミット関数をシャドーイングすることによって、強制的に C++ コンパイル・エラーとします。この処置の結果、ランタイム・エラーが後で発生するのではなく、未解決のサブミット呼び出しとなります。

パンクチュエーション用には、単一の submit 関数があります。

void submit(Punctuation const & punct, uint32_t port): punct パラメーターは、インデックス port の出力ポートからサブミットされるパンクチュエーションを表します。

ポートのミュータビリティー

入力ポートおよび出力ポートのタプルのミュータビリティーの設定は、オペレーター・モデル内に定義されます。要約すると、入力ポートの場合は、tupleMutationAllowed プロパティーを true に設定すると、着信タプルの変更を意図していることを宣言する意味になります。 出力ポートの場合は、tupleMutationAllowed プロパティーを true に設定すると、サブミットされるタプルを変更することを許可するという意味になります。 ポートのミュータビリティーは、各オペレーターを相互に接続する方法を制限するものではありません。SPL 言語ランタイムは、必要に応じてコピーを作成することで、非互換性を処理するからです。ポート・ミュータビリティーの設定は、オペレーター・フュージョンがある場合にパフォーマンスに影響を与えます。 例えば、フュージョン下で、ミュータブル出力ポートは、コピーを必要とすることなく、タプルをミュータブル入力ポートに受け渡すことができます。 一方、ファンアウト構成では、1 つの出力ポートからサブミットされたタプルを、コピーを作成することなく、複数の非ミュータブル入力ポートに受け渡すことができます。 以下の図に、ポート・ミュータビリティーの設定に関する一般ガイドラインを示します。

図 2. ポート・ミュータビリティーの設定
入力ポートのミュータビリティー
出力ポートのミュータビリティー

パラメーターへのアクセス

非汎用オペレーター (前に説明したプラグマを使用するオペレーター) の場合、自動生成される便利な関数を使用して、タイプ・セーフな方法でパラメーターにアクセスできます。 これらの関数は、オペレーター生成中に SPL_NON_GENERIC_OPERATOR_HEADER_PROLOGUE プラグマが展開されるときに作成されます。 これらのメンバー関数は、getParameter_<param-name> という名前であり、アプリケーションの SPL コードに指定されているパラメーターの最初の式値と同じ型の値を返します。つまり、以下のようになります。

<param-type> getParameter_<param-name>() const;

このヘルパー関数が生成されるのは、 パラメーターの式がすべて属性のない式である (すなわち、値が実行時にしか分からないストリーム・タプルおよび属性が含まれていない) 場合のみです。 より一般的な方法で汎用オペレーターの内部からパラメーター値にアクセスする方法については、 汎用オペレーターの実装を参照してください。非汎用オペレーターのパラメーター・アクセスの例を、 以下の SPL セグメントで考えてみます。

stream<...> MyStream = MyOperator(...) {
  param size : 10;
}

パラメーター size の値は、以下のようにして 取得できます。

int32 size = getParameter_size();
非汎用 (および汎用) オペレーターには、よりリフレクティブな方法でパラメーター値にアクセスするために、さらに 3 つの API が用意されています。
// Return the set of all parameter names
const std::tr1::unordered_set<std::string>& getParameterNames() const

class ParameterValue {
  public:
    virtual bool isValue() = 0;
    virtual bool isExpression() = 0;
    virtual ConstValueHandle& getValue() = 0;
};

パラメーターの各式には、関連付けられた対応する ParameterValue があります。 オペレーター・モデル内でパラメーターの expressionModeExpression または Attribute の場合、isExpression()true を返し、isValue()false を返します。 他のすべての expressionMode 値の場合、isValue()true を返し、isExpression()false を返します。 isValue()true の場合、式の値を返すために getValue() を呼び出すことができます。

オペレーター・モデル内でパラメーターの allowAnytrue であり、 明示的にリストされていないパラメーターである場合、コンパイラーはそれぞれの式を調べます。 ストリーム属性がある場合、その式の ParameterValueisExpression() == true になります。 ストリーム属性が参照されない場合、isValue()true を戻します。

typedef std::vector<ParameterValue*> ParameterValueListType;
typedef std::tr1::unordered_map<std::string, ParameterValueListType> ParameterMapType;
	
// Return the vector of parameter values for a given parameter name
const  ParameterValueListType& getParameterValues(std::string const & param) const

// Return the map of parameter names to parameter values
virtual ParameterMapType& getParameters()

getParameterNames 関数を使用して、特定のパラメーターが存在するかどうかを検査できます。 この検査によって、リフレクティブでないパラメーター・アクセス API では処理できない、オプション・パラメーターの扱いが可能になります。 パラメーターの値にアクセスするには、getParameterValues 関数を使用します。 パラメーターの名前は、string 引数として受け渡されます。複数のパラメーター式値を使用でき、 ConstValueHandle メンバー関数を使用して、isValue() == true の各 ParameterValue の型と値を抽出できます。 param がこのオペレーター・インスタンスの有効なパラメーター名ではない場合、SPLRuntimeInvalidArgumentException がスローされます。

使用例を以下に示します。
int32 size = 0; // the default
if (getParameterNames().count("size"))  {
  Operator::ParameterValueListType& sizeValues = getParameterValues("size");
	assert (sizeValues[0]->isValue());
  size = sizeValues[0]->getValue();  // read the first expression value
}

便利関数 hasParameter および getParameter は非推奨です。 パラメーターにアクセスするためのこれらの新しいルーチンに置き換えてください。

ConstValueHandle およびリフレクティブ・タイプ・システムの詳細については、リフレクティブ・タイプ・システムの使用法を参照してください。