com.ibm.streams.operator.state

Interface StateHandler

  • All Superinterfaces:
    java.lang.AutoCloseable, java.io.Closeable
    All Known Implementing Classes:
    DelegateStateHandler, OrderedStateHandlers, SortedTupleWindow, StatefulWindowListener, TumblingWindowSort


    public interface StateHandler
    extends java.io.Closeable
    Handler for operator state. If an Operator is an instance of StateHandler then it is automatically registered as a state handler, otherwise an operator registers an instance of this interface using OperatorContext.registerStateHandler(StateHandler).
    Operators that need to manage state are encouraged to always utilize a StateHandler either by implementing StateHandler or registering a state handler.

    The SPL Runtime invokes StateHandler methods when:

    StateHandler implementations should be careful to honor the contract of this interface to allow use in future scenarios. For example, no assumptions should be made about the operator being in a consistent region or checkpointing, and tuple submission should only occur from drain().


    When periodic checkpointing is enabled the SPL Runtime will call checkpoint(Checkpoint) for each registered state handler at the configured period.
    With operator driven checkpointing the operator calls CheckpointContext.createCheckpoint() to create a checkpoint, that will result in a callback to checkpoint(Checkpoint) for all registered handlers. These callbacks occur on the thread that called createCheckpoint().
    After PE restart, the SPL Runtime will reset the operator to a persisted state by calling reset(Checkpoint) or resetToInitialState().


    In a consistent region, an operator drains its processing and persists its state for the cut point defined by the establishing operator. The SPL runtime calls drain() and checkpoint(Checkpoint) at the correct times to ensure all operators in a region drain their processing and persist their state, consistent with having seen all tuples and marks up to the point defined by the establishing operator.
    An operator becomes consistent with this sequence:

    1. The operator has processed all tuples and marks on any data (non-control) input ports in the region.
    2. Draining operator processing using drain() for all state handlers.
    3. The operator has processed all tuples and marks on any control input ports in the region.
    4. Persisting state using checkpoint(Checkpoint) for all state handlers.
    Note that each of these steps is optional, for example a stateless operator may only perform step 2 to drain processing.
    If the operator code enables non-blocking checkpoint (via invoking ConsistentRegionContext.enableNonBlockingCheckpoint()), the prepareForNonBlockingCheckpoint(long) method is invoked by the SPL Runtime after the drain() method. The tuple flow is resumed after the return of prepareForNonBlockingCheckpoint(long) method. Meanwhile the SPL Runtime delegates an internal thread to execute the checkpoint(Checkpoint) method in the background. Background checkpointing is internally tracked by the SPL Runtime.
    For start operators of a consistent region, the regionCheckpointed(long) method is called once all the operators in the consistent region is drained an checkpointed, including those operators performing non-blocking checkpointing.
    After a failure in the region, the SPL Runtime will reset the operator to a consistent state by calling reset(Checkpoint) or resetToInitialState().


    Since:
    InfoSphere® Streams Version 4.0
    • Method Detail

      • drain

        void drain()
            throws java.lang.Exception
        Drain any outstanding processing. Once this method returns, the operator is indicating that it has drained all outstanding processing for all input ports.
        The method may:
        • Submit tuples to output ports.
        • Interact with external systems
        • Wait for background activity to drain outstanding work

        For a consistent region:

        • This method is called once all tuples and marks for the current cut for all data (non-control) input ports in the region have been processed, meaning the process and processPunctuation have returned for all tuples and marks.
        • A permit is implicitly acquired before the method is called, and released once it returns.
        • This method is not called concurrently with tuple or mark processing for any data (non-control) input ports in the region.
        • This method is may be called concurrently with tuple or mark or mark processing for any autonomous input ports

        Throws:
        java.lang.Exception - Exception attempting to drain state.
      • reset

        void reset(Checkpoint checkpoint)
            throws java.lang.Exception
        Reset the operator to a previous state defined by checkpoint.

        When in a consistent region checkpoint corresponds to the a previous successful drain and checkpoint cut cycle for the entire region. state.

        Throws:
        java.lang.Exception - Exception attempting to reset state.
      • checkpoint

        void checkpoint(Checkpoint checkpoint)
                 throws java.lang.Exception
        Checkpoint the operator state to support resetting of its state to the saved state, using reset(Checkpoint).

        For a consistent region this method is called when:

        • drain() has completed
        • All background threads have released processing permits.
        • All control input ports (if any) have been drained.

        Throws:
        java.lang.Exception - Exception attempting to checkpoint state.
      • retireCheckpoint

        void retireCheckpoint(long id)
                       throws java.lang.Exception
        Called when a checkpoint can be retired.
        Parameters:
        id - Id of checkpoint.
        Throws:
        java.lang.Exception - Exception attempting to retire a checkpoint state.
      • resetToInitialState

        void resetToInitialState()
                          throws java.lang.Exception
        Reset the operator to its initial state.

        If the operator is in a consistent region then this is called if a reset occurs prior to the first successful completion of drain and checkpoint cycle for the region.
        This method is not called when the operator first initializes.

        Throws:
        java.lang.Exception - Exception attempting to retire a checkpoint state.
      • prepareForNonBlockingCheckpoint

        default void prepareForNonBlockingCheckpoint(long id)
                                              throws java.lang.Exception
        Called to prepare for non-blocking checkpoint.

        This method is executed only when the operator enables non-blocking checkpointing with the ConsistentRegionContext.enableNonBlockingCheckpoint() method. This method is guaranteed to execute after drain() and before checkpoint(Checkpoint). It is also guaranteed to execute before the consistent region resumes tuple processing.
        This method was added after the interface was released in Version 4.0. It is defined as a default method for compatibility reasons. The default implementation is sufficient if the operator does not support non-blocking checkpoint.

        Parameters:
        id - Id of checkpoint.
        Throws:
        java.lang.Exception - Exception attempting to prepare for non-blocking checkpoint.
        Since:
        IBM Streams Version 4.2
      • regionCheckpointed

        default void regionCheckpointed(long id)
                                 throws java.lang.Exception
        Called when the whole consistent region has been drained and checkpointed.

        When all operators in a consistent region have finished checkpointing (including non-blocking checkpointing), this method is called to notify start operator(s) of the region. This method is called only for start operator(s) of a consistent region.
        This method was added after the interface was released in Version 4.0. It is defined as a default method for compatibility reasons.

        Parameters:
        id - Id of checkpoint.
        Throws:
        java.lang.Exception - Exception attempting to notify start operator that the region has been drained and checkpointed.
        Since:
        IBM Streams Version 4.2