Interface ConsistentRegionContext
-
- All Superinterfaces:
- OptionalContext
public interface ConsistentRegionContext extends OptionalContext
Context information for the consistent region the operator is in. A consistent region is declared using the SPLconsistent
annotationIn a consistent region an establishing operator establishes points where the region becomes consistent. A region is consistent if all operators in the region have state that reflects having completely processed all streams derived from the establishing operator's output streams.
Thus the establishing operator creates a cut point where it requires that the region becomes consistent, by ensuring all operators in the region have processed all of its output streams (tuples and punctuation marks), effectively the tuples and marks have completely flowed through the region. A cut may be defined by the establishing operator, such as at a file boundary, or periodically such as every five seconds.
To establish the consistent state the establishing operator stops submitting tuples and then callsmakeConsistent()
.
Each downstream operator in the region becomes consistent bydraining
its processing for all its input streams and thencheckpointing
its state.
Once all operators in the region have made themselves consistent, the region has reached a consistent state.Operators may have input and output ports that are outside of the consistent region, in which case these ports are not involved in becoming consistent.
An operator in a consistent region will have at least one input or output port that is part of the region.Any operator in a consistent region must acquire a permit before submitting tuples to output ports in the region. Only background activity needs to explicitly acquire and release permits, see
getSubmissionSemaphore()
for details on permit handling.Consistent Region limitations in InfoSphere® Streams Version 4.0
- The
mock test framework
for Java primitives does not support testing of consistent region functionality
- Since:
- InfoSphere® Streams Version 4.0
- See Also:
OperatorContext.getOptionalContext(Class)
-
-
Method Summary
Methods Modifier and Type Method and Description void
acquirePermit()
Acquire a single submission permit.javax.management.ObjectName
getConsistentRegionMXBeanName()
Get the object name of theConsistentRegionMXBean
MBean registered with the Job Control Plane.double
getDrainTimeout()
Get the drain timeout of the consistent region.int
getIndex()
Get the region index.java.util.Set<StreamingInput<?>>
getInputPorts()
Return the set of this operators input ports that are part of this consistent region.java.lang.Runnable
getPermitRunnable(java.lang.Runnable task)
Return a task that wrapstask
with permit acquisition and release for this region.int
getResetAttempt()
This method returns the current number of attempts of resetting a consistent region.double
getResetTimeout()
Get the reset timeout of the consistent region.long
getSequenceId()
This method returns the current sequence identifier of a consistent region.java.util.concurrent.Semaphore
getSubmissionSemaphore()
Get the semaphore used to control processing and tuple submission.boolean
isEndOfRegion()
Check if this operator is an end of a consistent region.boolean
isStartOfRegion()
Check if this operator is a start of a consistent region.boolean
isTriggerOperator()
Check if the operator is the trigger operator of a consistent cut region.boolean
makeConsistent()
Make the region consistent by draining processing and checkpointing state.void
releasePermit()
Release a single submission permit.void
reset()
Reset this consistent region to its last consistent state.
-
-
-
Method Detail
-
isStartOfRegion
boolean isStartOfRegion()
Check if this operator is a start of a consistent region.- Returns:
- True if this operator is a start of a consistent region, false otherwise.
-
isEndOfRegion
boolean isEndOfRegion()
Check if this operator is an end of a consistent region.- Returns:
- True if this operator is an end of a consistent region, false otherwise.
-
isTriggerOperator
boolean isTriggerOperator()
Check if the operator is the trigger operator of a consistent cut region.- Returns:
- True if the operator is the trigger operator of a consistent region, false otherwise.
-
getInputPorts
java.util.Set<StreamingInput<?>> getInputPorts()
Return the set of this operators input ports that are part of this consistent region.
-
getIndex
int getIndex()
Get the region index.- Returns:
- region index
-
getConsistentRegionMXBeanName
javax.management.ObjectName getConsistentRegionMXBeanName()
Get the object name of theConsistentRegionMXBean
MBean registered with the Job Control Plane. The object name can be used to subscribe to consistent region related notifications.- Returns:
- Consistent region MBean object name
-
getSequenceId
long getSequenceId()
This method returns the current sequence identifier of a consistent region.The sequence identifier is the id that a drain or a reset should be associated to. After operator startup, this method returns 1. On operator restart, the method returns -1 until it reset.
While a submission permit is held the returned value is stable.
- Returns:
- current sequence id of a drain or reset or -1 if the method is accessed after the operator restarts due to a PE crash but before the operator resets.
-
getResetAttempt
int getResetAttempt()
This method returns the current number of attempts of resetting a consistent region.If a drain is completed after a reset, this method returns -1.
- Returns:
- last reset attempt, or -1 if a drain has been successfully processed after a reset.
-
makeConsistent
boolean makeConsistent()
Make the region consistent by draining processing and checkpointing state.
makeConsistent()
is called bystart operators
to establish a point in time when the region was consistent. Normally the region becomes consistent by each operator draining all processing and checkpointing its state, in the case this method returns true.
If a failure occurs, then the region becomes consistent by resetting to a previous consistent state, in this case the method returns false.The
triggering operator
callsmakeConsistent()
to start the drain and checkpoint cycle.
With periodic triggering of drain and checkpoint cycles,start operators
may also callmakeConsistent()
to start the drain and checkpoint cycle, though the start may be delayed until the next scheduled cycle.A drain and checkpoint cycle results in a call to
StateHandler.drain()
followed byStateHandler.checkpoint(com.ibm.streams.operator.state.Checkpoint)
.
This call blocks until either the drain and and checkpoint cycle was completed for the region, or until the region is reset, if a failure occurred.
Whentrue
is returned it is guaranteed that the cut identified by the value ofgetSequenceId()
upon entry was completed without any reset. Thus any tuples submitted prior to this call or prior to the return fromStateHandler.drain()
have been completely processed by the region and the resultant operators' state checkpointed.
Whenfalse
is returned it is guaranteed that the cut identified by the value ofgetSequenceId()
upon entry was reset. Tuples submitted during that cut should be resubmitted to ensure they are processed.
If a drain and checkpoint cycle is not required or cannot be completed due to an ongoing reset, then it may not be started. In this case then thedrain()
andcheckpoint
methods may not be called, though the return value will represent the correct guarantee.For correct behavior, upon entry the caller must have acquired (implicitly or explicitly) a single
permit
.
This permit is released during the method's processing but will be re-acquired before the method returns. After the method returns, thus the caller still has a permit and thus is free to continue to submit tuples. The currentsequence identifier
most likely will have changed.After the method returns the state of the operator may have changed, either due to a reset or concurrent processing by other threads.
makeConsistent()
happens-before its call toStateHandler.drain()
.- Returns:
true
Region became consistent without any reset,false
region was reset.
-
reset
void reset() throws java.io.IOException
Reset this consistent region to its last consistent state.
This method can be used by operators when detecting an operator-specific transient failure. If the region is already being reset, a new attempt for reset is requested.- Throws:
IOException
- Reset can not be requested.
-
getSubmissionSemaphore
java.util.concurrent.Semaphore getSubmissionSemaphore()
Get the semaphore used to control processing and tuple submission. A permit from this semaphore is required to modify state and submit tuples to output ports in this region.
A permit is implicitly acquired by the SPL Runtime during calls to any of these methods (and released once the method completes):- Input port processing:
- Window event handling:
- State handling:
When a permit is implicitly acquired, operator implementations need not code any permit acquisition or release, thus only background activity, such a threads in a source operator, that needs to partake in the consistent region must acquire a permit. A typical pattern for a source operator submitting tuples would be:semaphore.acquire(); try { //get external data for tuples OutputTuple tuple = ... // update state to reflect tuple was submitted ... // submit tuples (can be one or many) getOutput(0).submit(tuple); } finally { semaphore.release(); }
- Returns:
- Semaphore used to acquire permits for tuple submission to output ports in this region.
- See Also:
acquirePermit()
,releasePermit()
-
acquirePermit
void acquirePermit() throws java.lang.InterruptedException
Acquire a single submission permit. Convenience method equivalent togetSemaphore().acquire()
- Throws:
java.lang.InterruptedException
-
releasePermit
void releasePermit()
Release a single submission permit. Convenience method equivalent togetSemaphore().release()
- Throws:
java.lang.InterruptedException
-
getPermitRunnable
java.lang.Runnable getPermitRunnable(java.lang.Runnable task)
Return a task that wrapstask
with permit acquisition and release for this region. The permit is acquired before callingtask.run()
and released before the returnedRunnable
run()
method returns, even if an undeclared exception is thrown.- Parameters:
task
- Task to be wrapped.- Returns:
task
wrapped in aRunnable
that acquires and releases a permit.
-
getDrainTimeout
double getDrainTimeout()
Get the drain timeout of the consistent region.- Returns:
- drain timeout
- Since:
- InfoSphere® Streams Version 4.0.1
-
getResetTimeout
double getResetTimeout()
Get the reset timeout of the consistent region.- Returns:
- reset timeout
- Since:
- InfoSphere® Streams Version 4.0.1
-
-