IBM Streams 4.3.0

整合領域内でウィンドウを使用するプリミティブ C++ オペレーター

整合領域内で C++ ウィンドウ操作ライブラリーを使用するオペレーターの状態をチェックポイント処理および再設定するには、オペレーターはウィンドウ状態管理関数を呼び出す必要があります。

ウィンドウ API 関数

整合領域内での状態管理をサポートするために、C++ ウィンドウ操作ライブラリーには以下の関数が用意されています。

virtual void drain();
未解決のウィンドウ処理をドレーンします。保留中のタプル排除アクションおよびウィンドウ・トリガー・アクションが完了するまで待機します。
virtual void checkpoint(Checkpoint & data);
ウィンドウ状態をチェックポイント処理して、指定されたオブジェクトに入れます。ウィンドウ状態にはウィンドウ内容と状態が含まれ、状態には排除ポリシーとトリガー・ポリシーが関連付けられています。
virtual void reset(Checkpoint & data);
指定されたチェックポイントから読み取った状態にウィンドウを復元します。
virtual void resetToInitialState();
ウィンドウを初期状態に復元します。

ウィンドウ・イベント

WindowEvent インターフェースを実装する 1 つ以上のリスナーにウィンドウを関連付けることができます (2 つのリスナーが同じイベントを受け取ることはできません)。ウィンドウでチェックポイント処理が行われるか、再設定されるか、または初期状態への再設定が行われると、 ウィンドウの直列化ハンドラーとして登録された 1 つのリスナーが、ウィンドウでチェックポイント処理が行われるか、再設定されるか、または初期状態への再設定が行われた直後に、 イベントを受信します。WindowEvent インターフェースは以下の関数を含んでいます。

virtual void onCheckpointEvent(Checkpoint & ckpt) const {}
このイベントは、指定されたチェックポイントにウィンドウが保存された後に発生します。イベント・ハンドラーは、その状態をチェックポイント・ストリームに書き込む責任があります。
virtual void onResetEvent(Checkpoint & ckpt) {}
このイベントは、指定されたチェックポイントからウィンドウの状態が読み取られた後に発生します。イベント・ハンドラーは、チェックポイント・ストリームに保存されたその状態を読み取る責任があります。
virtual void onResetToInitialStateEvent() {}
このイベントは、ウィンドウの状態が初期設定された後に発生します。イベント・ハンドラーは、その状態を初期設定する責任があります。

ウィンドウを使用する C++ プリミティブ・オペレーター

状態ハンドラー
ウィンドウを使用する C++ オペレーターには、ウィンドウのドレーン、チェックポイント処理、および再設定を行うために StateHandler 実装環境が必要です。以下のコード・サンプルは、 StateHandler 実装環境がどのようにウィンドウ API を起動するのかを示しています。
void MY_OPERATOR::drain() {
    . . .
    _window.drain();
}

void MY_OPERATOR::checkpoint(Checkpoint & ckpt) {
    . . .
    _window.checkpoint(ckpt);
}

void MY_OPERATOR::reset(Checkpoint & ckpt) {
    . . .
    _window.reset(ckpt);
}

void MY_OPERATOR::resetToInitialState() {
    . . .
    _window.resetToInitialState();
}
タンブリング・ウィンドウ・サマライザー
タンブリング・ウィンドウ・サマライザーを使用して、ウィンドウ操作ライブラリーがタンブリング・ウィンドウ内のすべてのタプルを保持する必要をなくすことができます。サマライザーは状態を含むことができ、したがって、ウィンドウのチェックポイント処理または再設定が行われるときに状態を保存またはロードする必要があります。TumblingWindowSummarizer インターフェースは以下のイベントを含んでいます。
virtual void onCheckpointEvent(Checkpoint & ckpt) {}
このイベントは、現行ウィンドウの状態がチェックポイント処理されると発生します。指定されたチェックポイント・ストリームに状態を直列化するように、サマライザー・コードを作成してください。
virtual void onResetEvent(Checkpoint & ckpt) {}
このイベントは、現行ウィンドウが、指定されたチェックポイントによって提供される状態に復元されると発生します。指定されたチェックポイント・ストリームから読み取ることによって状態を復元するように、 サマライザー・コードを作成してください。

ウィンドウは、初期状態に再設定されると、そのウィンドウのサマライザーを再作成します。

次の例は、 整合領域に参加するためのウィンドウ操作されたタプルの数を計算するタンブリング・ウィンドウ・サマライザーを更新する方法を示しています。

#define MY$OP MY_OPERATOR_SCOPE::MY_OPERATOR
struct MY_OPERATOR::MyCountSummarizer : public  
            SPL::TumblingWindowSummarizer<MY$OP::IPort0Type,MY$OP::PartitionByType>  {
  MY$OP& operator_;
  uint64_t count_;

  MyCountSummarizer(SPL::Operator& oper): operator_(static_cast<MY$OP&>(oper)) {
    Count_ = 0;
  }

  void onTupleInsertionEvent(MY$OP::IPort0Type const& tuple) {
      count_++;
  }

  void onCheckpointEvent(SPL::Checkpoint & ckpt) const {
      ckpt << count_;
  }

  void onResetEvent(SPL::Checkpoint & ckpt) {
      ckpt >> count_;
  }
};
パーティション・タイプ・ヘルパー・クラス
SPL::CodeGen モジュール内の emitClass 関数を使用する、生成された PartitionType ヘルパー・クラスは、直列化サポートを提供します。