Interface StateHandler
-
- All Superinterfaces:
- java.lang.AutoCloseable, java.io.Closeable
- All Known Implementing Classes:
- DelegateStateHandler, OrderedStateHandlers, SortedTupleWindow, StatefulWindowListener, TumblingWindowSort
public interface StateHandler extends java.io.CloseableHandler for operator state. If anOperatoris an instance ofStateHandlerthen it is automatically registered as a state handler, otherwise an operator registers an instance of this interface usingOperatorContext.registerStateHandler(StateHandler).
Operators that need to manage state are encouraged to always utilize aStateHandlereither by implementingStateHandleror registering a state handler.The SPL Runtime invokes
StateHandlermethods when:-
checkpointingis enabled. - the operator is in a
consistent region.
StateHandlerimplementations should be careful to honor the contract of this interface to allow use in future scenarios. For example, no assumptions should be made about the operator being in a consistent region or checkpointing, and tuple submission should only occur fromdrain().
When periodic
checkpointingis enabled the SPL Runtime will callcheckpoint(Checkpoint)for each registered state handler at the configured period.
With operator driven checkpointing the operator callsCheckpointContext.createCheckpoint()to create a checkpoint, that will result in a callback tocheckpoint(Checkpoint)for all registered handlers. These callbacks occur on the thread that calledcreateCheckpoint().
After PE restart, the SPL Runtime will reset the operator to a persisted state by callingreset(Checkpoint)orresetToInitialState().
In a
consistent region, an operator drains its processing and persists its state for the cut point defined by the establishing operator. The SPL runtime callsdrain()andcheckpoint(Checkpoint)at the correct times to ensure all operators in a region drain their processing and persist their state, consistent with having seen all tuples and marks up to the point defined by the establishing operator.
An operator becomes consistent with this sequence:- The operator has processed all
tuplesandmarkson any data (non-control) input ports in the region. - Draining operator processing using
drain()for all state handlers. - The operator has processed all
tuplesandmarkson any control input ports in the region. - Persisting state using
checkpoint(Checkpoint)for all state handlers.
If the operator code enables non-blocking checkpoint (via invokingConsistentRegionContext.enableNonBlockingCheckpoint()), theprepareForNonBlockingCheckpoint(long)method is invoked by the SPL Runtime after thedrain()method. The tuple flow is resumed after the return ofprepareForNonBlockingCheckpoint(long)method. Meanwhile the SPL Runtime delegates an internal thread to execute thecheckpoint(Checkpoint)method in the background. Background checkpointing is internally tracked by the SPL Runtime.
For start operators of a consistent region, theregionCheckpointed(long)method is called once all the operators in the consistent region is drained an checkpointed, including those operators performing non-blocking checkpointing.
After a failure in the region, the SPL Runtime will reset the operator to a consistent state by callingreset(Checkpoint)orresetToInitialState().- Since:
- InfoSphere® Streams Version 4.0
-
-
Method Summary
All Methods Instance Methods Abstract Methods Default Methods Modifier and Type Method and Description voidcheckpoint(Checkpoint checkpoint)Checkpoint the operator state to support resetting of its state to the saved state, usingreset(Checkpoint).voiddrain()Drain any outstanding processing.default voidprepareForNonBlockingCheckpoint(long id)Called to prepare for non-blocking checkpoint.default voidregionCheckpointed(long id)Called when the whole consistent region has been drained and checkpointed.voidreset(Checkpoint checkpoint)Reset the operator to a previous state defined bycheckpoint.voidresetToInitialState()Reset the operator to its initial state.voidretireCheckpoint(long id)Called when a checkpoint can be retired.
-
-
-
Method Detail
-
drain
void drain() throws java.lang.ExceptionDrain any outstanding processing. Once this method returns, the operator is indicating that it has drained all outstanding processing for all input ports.
The method may:- Submit tuples to output ports.
- Interact with external systems
- Wait for background activity to drain outstanding work
For a consistent region:
- This method is called once all tuples and marks for the current cut
for all data (non-control) input ports in the region have been
processed, meaning the
processandprocessPunctuationhave returned for all tuples and marks. - A
permitis implicitly acquired before the method is called, and released once it returns. - This method is not called concurrently with tuple or mark processing for any data (non-control) input ports in the region.
- This method is may be called concurrently with tuple or mark or mark processing for any autonomous input ports
- Throws:
java.lang.Exception- Exception attempting to drain state.
-
reset
void reset(Checkpoint checkpoint) throws java.lang.Exception
Reset the operator to a previous state defined bycheckpoint.When in a consistent region
checkpointcorresponds to the a previous successful drain and checkpoint cut cycle for the entire region. state.- Throws:
java.lang.Exception- Exception attempting to reset state.
-
checkpoint
void checkpoint(Checkpoint checkpoint) throws java.lang.Exception
Checkpoint the operator state to support resetting of its state to the saved state, usingreset(Checkpoint).For a consistent region this method is called when:
-
drain()has completed - All background threads have released processing permits.
- All control input ports (if any) have been drained.
- Throws:
java.lang.Exception- Exception attempting to checkpoint state.
-
-
retireCheckpoint
void retireCheckpoint(long id) throws java.lang.ExceptionCalled when a checkpoint can be retired.- Parameters:
id- Id of checkpoint.- Throws:
java.lang.Exception- Exception attempting to retire a checkpoint state.
-
resetToInitialState
void resetToInitialState() throws java.lang.ExceptionReset the operator to its initial state.If the operator is in a consistent region then this is called if a reset occurs prior to the first successful completion of drain and checkpoint cycle for the region.
This method is not called when the operator first initializes.- Throws:
java.lang.Exception- Exception attempting to retire a checkpoint state.
-
prepareForNonBlockingCheckpoint
default void prepareForNonBlockingCheckpoint(long id) throws java.lang.ExceptionCalled to prepare for non-blocking checkpoint.This method is executed only when the operator enables non-blocking checkpointing with the
ConsistentRegionContext.enableNonBlockingCheckpoint()method. This method is guaranteed to execute afterdrain()and beforecheckpoint(Checkpoint). It is also guaranteed to execute before the consistent region resumes tuple processing.
This method was added after the interface was released in Version 4.0. It is defined as a default method for compatibility reasons. The default implementation is sufficient if the operator does not support non-blocking checkpoint.- Parameters:
id- Id of checkpoint.- Throws:
java.lang.Exception- Exception attempting to prepare for non-blocking checkpoint.- Since:
- IBM Streams Version 4.2
-
regionCheckpointed
default void regionCheckpointed(long id) throws java.lang.ExceptionCalled when the whole consistent region has been drained and checkpointed.When all operators in a consistent region have finished checkpointing (including non-blocking checkpointing), this method is called to notify start operator(s) of the region. This method is called only for start operator(s) of a consistent region.
This method was added after the interface was released in Version 4.0. It is defined as a default method for compatibility reasons.- Parameters:
id- Id of checkpoint.- Throws:
java.lang.Exception- Exception attempting to notify start operator that the region has been drained and checkpointed.- Since:
- IBM Streams Version 4.2
-
-