IBM Streams 4.2.1

チェックポイント

チェックポイントは、障害から復旧できるように、実行時にオペレーターの状態を永続化するプロセスです。障害発生時には、チェックポイントの状態から再設定することで、オペレーターを再始動できます。

オペレーターのチェックポイント処理 (および関連付けられた再設定) は次の 2 とおりの方法でトリガーできます。
  1. オペレーターが整合領域内にある場合、オペレーターの状態のチェックポイント処理および再設定は、整合領域プロトコルによって自動的に起動されます。オペレーターの状態は、整合領域のドレーン中にチェックポイント処理され、整合領域の再設定中に再設定されます。
  2. オペレーターが整合領域の外にある (つまり、自律領域内にある) 場合、SPL の config checkpoint 節を介して、チェックポイント処理を実行するようにオペレーターを構成できます。このタイプのチェックポイント処理は、オペレーター駆動型チェックポイント処理と定期的チェックポイント処理という 2 つのタイプにさらに細分化できます。オペレーター駆動型チェックポイント処理が Java/C++ オペレーター・コード内から明示的に要求されるのに対し、定期的チェックポイント処理はユーザーが定義した一定間隔で自動的に実行されます。障害が起こった場合、SPL ランタイムは、オペレーター状態を、最後に保存されたチェックポイントに自動的に再設定します。以下に、2 つのオペレーター例を示します。これらのオペレーターは、それぞれ、オペレーター駆動型チェックポイント処理と定期的チェックポイント処理を指定して構成されています。
stream<...> Out1 = MyOper1(...) {
   ...
   config
      checkpoint : operatorDriven; // operator driven checkpointing
      restartable : true;
}
 
stream<...> Out2 = MyOper2(...) {
   ...
   config
      checkpoint : periodic(10.5); // periodic checkpointing; checkpointing interval is 10.5 seconds
      restartable : true;
}

この SPL コード例では、オペレーター MyOper1 はオペレーター駆動型チェックポイント処理を実行するように構成されていて、オペレーター MyOper2 は 10.5 秒ごとに定期的チェックポイント処理を実行するように構成されています。

チェックポイント処理のタイプに関わらず、オペレーターのチェックポイント・データは、キーと値のペアの集合で表されます。各チェックポイントのキーは固有のシーケンス ID であり、値は状態を直列化したものです。チェックポイント処理が起動されると、SPL ランタイムは自動的にシーケンス ID を割り当て、オペレーターは Checkpointing API を使用して状態データを直列化します。結果のチェックポイントは、キーと値のペアの形式で、バックエンド永続データ・ストアに保管されます。再設定時に、SPL ランタイムは、どのシーケンス ID から再設定するのかを判別し、対応するチェックポイントをバックエンド・データ・ストアから取り出し、そのチェックポイントからのデータを非直列化してオペレーター状態を再設定します。

バージョン 4.0 以降、IBM® Streams は、 C++ と Java の両方で Checkpointing Runtime API の新しいセットを提供します。この新しい Checkpointing API は、 C++ と Java の Operator Runtime API の一部であり、オペレーターがそれ自体の状態データのチェックポイント処理および再設定を行うためのものです。バージョン 4.0 より前のバージョンの Checkpointing API は非推奨になり、ユーザーはこのページで説明されている新しい API を使用するよう奨励されていることに注意してください。

Checkpointing Runtime API は、StateHandler インターフェース、 Checkpoint クラス、および CheckpointContext クラス (これはオプションです) の 3 つのパートからなります。

C++ Checkpointing Runtime API の最初のパートは StateHandler クラスです。StateHandler クラスは、checkpoint() 関数や reset() 関数など、一連のコールバック・インターフェースを定義します。オペレーター開発者は、オペレーター実装にそれらのコールバック関数を指定し、SPL ランタイムにそれらのコールバック関数を登録します。実行時に、SPL ランタイムは登録されたコールバック関数 checkpoint() および reset() を自動的に起動して、オペレーターの状態のチェックポイント処理および再設定を行います。

オペレーター・クラスは、StateHandler インターフェースからの継承によってこれを実装します。以下の例は、独自の StateHandler コールバック関数を実装する C++ オペレーターを示しています。

このコード・スニペットはオペレーター実装のヘッダー・ファイルです。

#include <SPL/Runtime/Operator/StateHandler.h>
 
class MY_OPERATOR : public MY_BASE_OPERATOR, StateHandler // inherit from StateHandler
{
public:
    ...
 
    // callback functions from StateHandler.h
    virtual void checkpoint(Checkpoint & ckpt);
    virtual void reset(Checkpoint & ckpt);
    virtual void resetToInitialState();
private:
    // variables myInt_ and myVector_ constitute the state of this operator
    uint32_t myInt_;
    std::vector<IPort0Type> myVector_;    
    Mutex myMutex_; // lock for guarding concurrent access to operator state
};

このコード・スニペットはオペレーター実装の C++ ファイルです。

// Constructor of operator
MY_OPERATOR::MY_OPERATOR()
    : myInt_(0)
{
    ...
    // register StateHandler callbacks
    getContext().registerStateHandler(*this);
}
 
// process() function for tuple processing
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port)
{
    // acquire lock
    AutoMutex am(myMutex_);
              
    // manipulate operator state during tuple processing
    IPort0Type & ituple = static_cast<IPort0Type &>(tuple);
    myInt_ ++;
    myVector_.push_back(ituple);
 
    // submit tuple downstream
    OPort0Type otuple(tuple.getAttributeValue(0), myInt_);
    submit(otuple, 0);
}
 
// checkpoint() callback to checkpoint operator state data
void MY_OPERATOR::checkpoint(Checkpoint & ckpt)
{
    // acquire lock
    AutoMutex am(myMutex_);
 
    SPLAPPTRC(L_TRACE, "Checkpointing with sequence id: " << ckpt.getSequenceId(), "MY_OP");
              
    // serialize state data to Checkpoint
    ckpt << myInt_;
    ckpt << myVector_; 
}
 
// reset() callback to reset operator state data
void MY_OPERATOR::reset(Checkpoint & ckpt)
{
    // acquire lock
    AutoMutex am(myMutex_);
 
    SPLAPPTRC(L_TRACE, "Resetting with sequence id: " << ckpt.getSequenceId(), "MY_OP");
              
    // de-serialize state data from Checkpoint   
    ckpt >> myInt_;
    ckpt >> myVector_; 
}
 
// resetToIntialState() callback to reset operator to initial state
void MY_OPERATOR::resetToInitialState()
{
    // acquire lock
    AutoMutex am(myMutex_);
    myInt_ = 0;
    myVector_.clear();
}

同等の Java StateHandler インターフェースについて詳しくは、SPL Java オペレーター API 資料を参照してください。

C++ Checkpointing Runtime API の 2 番目のパートは Checkpoint クラスです。Checkpoint クラスは、チェックポイント・データに抽象化を提供します。チェックポイント処理中には、1 つの Checkpoint インスタンスがオペレーターの StateHandler::checkpoint() コールバック関数に渡されます。オペレーターは、<< 演算子を使用して、指定されたCheckpoint インスタンスにデータを直列化することによって、チェックポイント処理するデータを指定できます。SPL ランタイムは、チェックポイント処理されたデータを内部的に直列化し、そのデータをバックエンド・データ・ストアに転送します。再設定時には、1 つの Checkpoint インスタンスがオペレーターの StateHandler::reset() コールバック関数に渡されます。オペレーターは、>> 演算子を使用して、指定された Checkpoint インスタンスからデータを非直列化することによって、チェックポイントからのデータを抽出し、オペレーターの状態を再設定します。

Checkpoint クラスは、すべてのプリミティブ・データ型、SPL データ型、および標準的 C++ STL コンテナー型に対して、直列化と非直列化をサポートします。ネストされたデータ型の直列化および非直列化も自動的に処理されます。

直列化と非直列化に加えて、オペレーターのコードは Checkpoint::getSequenceId() 関数を呼び出すことによって、書き込みまたは再設定されるチェックポイントのシーケンス ID を取得することもできます。 整合領域内のオペレーターの場合、チェックポイントのシーケンス ID は、整合領域プロトコルによって割り当てられるマーカー ID と同じです。自律領域内にあるオペレーターの場合、シーケンス ID は、オペレーターごとのローカルな内部カウンターによって生成されます。どちらの場合も、シーケンス ID は正整数であり、新しいチェックポイントで増やされますが、シーケンス ID 番号が連続している保証はありません。

同等の Java Checkpoint インターフェースについて詳しくは、SPL Java オペレーター API 資料を参照してください。

オペレーターの状態には、SPL ロジック状態変数 (オプション) と状態データという 2 つのパートがあることに注意してください。ロジック状態変数 (ある場合) のチェックポイント処理および再設定が SPL ランタイムによって自動的に行われるのに対し、状態データのチェックポイント処理および再設定はオペレーターが指定する StateHandler::checkpoint() コールバックおよび StateHandler::reset() コールバックを介して行われます。

オペレーターにロジック状態変数がある場合、コード生成中に、SPL コンパイラーは、ロジック状態変数のチェックポイント処理および再設定を行うための 2 つの関数を自動的に生成します。それら 2 つの関数 checkpointStateVariables() および resetStateVariables() のシグニチャーは次のとおりです。

// auto-generated functions for checkpointing and resetting logic-state variables
void MY_BASE_OPERATOR::checkpointStateVariables(Checkpoint & ckpt);
void MY_BASE_OPERATOR::resetStateVariables(Checkpoint & ckpt);

チェックポイント処理中には、SPL ランタイムは、最初に StateHandler::checkpoint() コールバック関数を呼び出してオペレーター状態データをチェックポイント処理し、次に、自動的に checkpointStateVariables() 関数を呼び出してロジック状態変数をチェックポイント処理します。再設定時には、SPL ランタイムは、最初に StateHandler::reset() コールバック関数を呼び出してオペレーター状態データを再設定し、次に、自動的に resetStateVariables() 関数を呼び出してロジック状態変数を再設定します。

Checkpointing Runtime API の 3 番目のパートはオプションの CheckpointContext クラスです。オペレーターが自律領域内にあり、分散モードで実行し、かつ、SPL で config checkpointoperatorDriven または periodic(T) のいずれかに指定されている場合、1 つの CheckpointContext インスタンスがこのオペレーターに提供されます。この CheckpointContext インスタンスは、オペレーターのチェックポイント処理構成 (例えば、オペレーター駆動型チェックポイント処理と定期的チェックポイント処理のどちらが構成されているのか、定期的チェックポイント処理の間隔はどうなのかなど) を照会する関数を提供します。また、この CheckpointContext インスタンスは、オペレーター・コードでオペレーター駆動型チェックポイント処理を実行するための CheckpointContext::createCheckpoint() 関数も提供します。以下のコードは、オペレーター駆動型チェックポイント処理のためにオペレーター・コードで CheckpointContext::createCheckpoint() 関数を使用する方法を示しています。

// the header file which defines CheckpointContext class
#include <SPL/Runtime/Operator/State/CheckpointContext.h>
.... 
// operator code can use CheckpointContext::createCheckpoint() function for operator driven checkpointing;
// to do so, first retrieve the CheckpointContext instance
CheckpointContext * ckptContext = static_cast<CheckpointContext *>(this->getContext().getOptionalContext(CHECKPOINT));
if (ckptContext != NULL) {
    // then call CheckpointContext::createCheckpoint() to perform checkpointing
    ckptContext->createCheckpoint();
}

CheckpointContext::createCheckpoint() 関数は、オペレーター・コードから呼び出されると、オペレーターの StateHandler::checkpoint() コールバック関数および checkpointStateVariables() 関数を、オペレーターの状態をチェックポイント処理するために内部的に起動します。createCheckpoint() 関数はチェックポイント処理が終了した後で戻るため、チェックポイント処理は呼び出し側スレッドと同期します。チェックポイント処理が正常に終了すると、createCheckpoint() 関数は true を返します。チェックポイント処理中に何らかのエラーがある場合、createCheckpoint() 関数は SPL::DataStoreException エラーをスローします。チェックポイント処理が実行されない場合、createCheckpoint() 関数は何も行わずに false を返します。バージョン 4.0 以降、CheckpointContext::createCheckpoint() 関数は true を返すか、例外をスローするかのいずれかになりました。false を返すケースは予約済みです。

CheckpointContext インスタンスは、スタンドアロン・モードで実行するオペレーターや、SPL で config checkpoint が指定されていないオペレーターには使用可能ではありません。そのような場合には、OperatorContext::getOptionalContext(CHECKPOINT) 関数は NULL を返します。

同等の Java CheckpointContext インターフェースについて詳しくは、SPL Java オペレーター API 資料を参照してください。

Checkpointing API を使用する際には、以下のガイドラインに従ってください。
  1. Checkpointing API でのロック:

    整合領域駆動型チェックポイント処理または定期的チェックポイント処理の場合、StateHandler::checkpoint() コールバック関数が内部 SPL ランタイム・スレッドによって呼び出され、タプル処理またはバックグラウンド・オペレーター・スレッドと非同期です。したがって、SPL ランタイムでは、オペレーター・コードでユーザー定義ロックを管理して、チェックポイント処理スレッドがオペレーター状態への排他アクセス権を持つことを確実にする必要があります。オペレーター駆動型チェックポイント処理の場合、CheckpointContext::createCheckpoint() 関数を呼び出すスレッドが内部的に StateHandler::checkpoint() コールバック関数を実行します。checkpoint() コールバック関数が、呼び出し側スレッドによって既に保持されているロックを獲得しようとすると、呼び出し側スレッドはそれ自体をブロックして、デッドロックという結果になることがあります。SPL ランタイムでは、オペレーター状態のチェックポイント処理を行うスレッドが同じユーザー定義ロックを複数回獲得することでデッドロックを作成しないように、オペレーター・コードでユーザー定義ロックを管理することが必要です。このデッドロック問題を回避するために推奨される解決策の 1 つは、何らかの形で再入可能ロック (例えば boost::recursive_mutex) を使用することです。

  2. Checkpointing API の使用例:
    さまざまな種類のチェックポイント処理がありますが、Checkpointing API を適切に使用すれば、多様なシナリオで使用できるオペレーターを実装し、チェックポイント処理および再設定を適切に実行するのは簡単です。下の例で示されているオペレーターは、次のどのシナリオでも使用できます。
    • 整合領域内で使用して、整合領域駆動型チェックポイント処理および再設定を実行できます。
    • 整合領域の外側で使用して、分散モードで実行し、SPL で config checkpointperiodic(T) に設定されている場合は定期的チェックポイント処理を実行できます。
    • 整合領域の外側で使用して、分散モードで実行し、SPL で config checkpointoperatorDriven に設定されている場合はオペレーター駆動型チェックポイント処理を実行できます。
    • 整合領域の外側で使用して、分散モードで実行し、SPL で config checkpoint が設定されていない場合はチェックポイント処理も再設定も行わないことができます。
    • スタンドアロン・モードで実行し、チェックポイント処理も再設定も行わないことができます。

このコード・スニペットはオペレーター実装の MyOp_h.cgt ファイルです。

<%
    # use CodeGen API to query whether the operator is used inside a consistent region
    my $isInConsistentRegion = $model->getContext()->getOptionalContext("ConsistentRegion");
    # if operator is outside consistent region, use CodeGen API to determine the kind of checkpointing (none, periodic, or operatorDriven)
    my $ckptKind = $model->getContext()->getCheckpointingKind();
    my @includes;
    if ($isInConsistentRegion) {
        push @includes, "#include <SPL/Runtime/Operator/State/StateHandler.h>";
    }
    if ($ckptKind ne "none") {
        push @includes, "#include <SPL/Runtime/Operator/State/CheckpointContext.h>";
    }
    SPL::CodeGen::headerPrologue($model, \@includes);   
%>

<% if ($ckptKind eq "operatorDirven") { %>
#include <boost/thread/recursive_mutex.hpp>
<% } %>
 
class MY_OPERATOR : public MY_BASE_OPERATOR
<% if ($isInConsistentRegion || $ckptKind ne "none") {%>
, public StateHandler
<% } %> {
public:
    // constructor
    MY_OPERATOR();
              
    // destructor
    virtual ~MY_OPERATOR();
   
    // tuple processing for mutating ports
    void process(Tuple & tuple, uint32_t port);
    ...
 
<% if ($isInConsistentRegion || $ckptKind ne "none") { %>
    // StateHandler callbacks
    virtual void checkpoint(Checkpoint & ckpt);
    virtual void reset(Checkpoint & ckpt);
    virtual void resetToIntialState();
<% } %>
 
private:
<% if ($ckptKind eq "operatorDirven") { %>
    // for operator-driven checkpointing, use a re-entrant lock to guard operator state
    // so that the thread calling createCheckpoint() does not deadlock itself
    boost::recursive_mutex myLock_; 
<% } else { %>
    // for all other cases, use SPL::Mutex
    Mutex myLock_;
<% } %>
    // variables myVar1_ and myVar2_ constitute this operator's state
    int myVar1_;
    float myVar2_;
};

このコード・スニペットはオペレーター実装の MyOp_cpp.cgt ファイルです。

<%
   # use CodeGen API to query whether the operator is used inside a consistent region
   my $isInConsistentRegion = $model->getContext()->getOptionalContext("ConsistentRegion");
   # if operator is outside consistent region, use CodeGen API to determine the kind of checkpointing (none, periodic, or operatorDriven)
   my $ckptKind = $model->getContext()->getCheckpointingKind();
%>
...
 
<%if ($isInConsistentRegion || $ckptKind ne "none") {%>
 
// operator-provided StateHandler::checkpoint() callback
void MY_OPERATOR::checkpoint(Checkpoint & ckpt)
{
    <% if ($ckptKind eq " operatorDirven ") {
        // this operator is used outside consistent region and is configured with operator driven checkpointing
        // in this case, myLock_ is an re-entrant lock
        boost::recursive_mutex::scoped_lock am(myLock_);
    <% } else { %>
        // this operator is in consistent region, or outside consistent region but configured with periodic checkpointing
        // in this case, myLock_ is a regular SPL::Mutex
        AutoMutex am(myLock_);
    <% } %>
    // checkpoint operator state
    ckpt << myVar1_ << myVar2_;
}
 
// operator-provided StateHandler::reset() callback
void MY_OPERATOR::reset(Checkpoint & ckpt)
{
    <% if ($ckptKind eq " operatorDirven ") {
        // this operator is used outside consistent region and is configured with operator driven checkpointing
        // in this case, myLock_ is an re-entrant lock
        boost::recursive_mutex::scoped_lock am(myLock_);
    <% } else { %>
        // this operator is in consistent region, or outside consistent region but configured with periodic checkpointing
        // in this case, myLock_ is a regular SPL::Mutex
        AutoMutex am(myLock_);
    <% } %>
    // reset operator state
    ckpt >> myVar1_ >> myVar2_;
}
 
// operator-provided StateHandler::resetToInitialState() callback
void MY_OPERATOR::resetToInitialState()
{
    <% if ($ckptKind eq " operatorDirven ") {
        // this operator is used outside consistent region and is configured with operator driven checkpointing
        // in this case, myLock_ is an re-entrant lock
        boost::recursive_mutex::scoped_lock am(myLock_);
    <% } else { %>
        // this operator is in consistent region, or outside consistent region but configured with periodic checkpointing
        // in this case, myLock_ is a regular SPL::Mutex
        AutoMutex am(myLock_);
    <% } %>
    // reset operator state to initial values
    myVar1_ = 0;
    myVar2_ = 0.0;
}
<%}%>
 
// tuple processing function
void MY_OPERATOR::process(Tuple & tuple, uint32_t port)
{
    <% if ($ckptKind eq " operatorDirven ") {
        // this operator is used outside consistent region and is configured with operator driven checkpointing
        // in this case, myLock_ is an re-entrant lock
        boost::recursive_mutex::scoped_lock am(myLock_);
    <% } else { %>
        // this operator is in consistent region, or outside consistent region but configured with periodic checkpointing
        // in this case, myLock_ is a regular SPL::Mutex
        AutoMutex am(myLock_);
    <% } %>
    // process tuple and manipulate operator state
    IPort0Type & ituple = static_cast<IPort0Type &>(tuple);
    myVar1_ += ituple.get_myInt();
    myVar2_ += ituple.get_myFloat();
 
    // perform operator driven checkpointing synchronously
    CheckpointContext * ckptContext = static_cast<CheckpointContext *>(this->getContext().getOptionalContext(CHECKPOINT));
    if (ckptContext != NULL && ckptContext->getKind() == CheckpointContext::operatorDriven) {
        ckptContext->createCheckpoint();
    }
}

このサンプル・コードでは、オペレーターは、operatorDriven チェックポイント処理を実行するように構成されているか、それとも他の種類のチェックポイント処理 (定期的または整合領域駆動型) を実行するように構成されているかによって違ってきます。operatorDriven チェックポイント処理の場合、オペレーター・コードは、オペレーター状態を保護するために再入可能ロックを使用することによって、CheckpointContext::createCheckpoint() 関数が process() 関数から呼び出されたときに、checkpoint() コールバック関数が、呼び出し側スレッドで、そのスレッドが既に保持しているのと同じロックを獲得しようとしてデッドロックを引き起こさないようにします。他の種類のチェックポイント処理の場合、オペレーター・コードは通常の SPL::Mutex ロックを使用します。

オペレーターがオペレーター駆動型チェックポイント処理をオペレーター・コード内で使用していない場合、デッドロックを回避するための再入可能ロックの使用は必要ありません。その場合、オペレーター・コードは単に通常の SPL::Mutex ロックを使用でき、コード生成時のチェックポイント処理タイプの検査を除去できます。