Primitive operators that start consistent regions
If a start operator is in a periodic consistent region and it completes submitting output tuples, draining and resetting of the consistent region continues until the job finishes.
Consistent region permits
In general, start operators of a consistent region are source operators. In C++, source operators must implement the process(uint32_t) method, which is the function that is called by the InfoSphere® Streams instance to run the main logic of a thread. When the operator is outside of a consistent region, the operator can submit tuples at any time. When the operator is in a consistent region, the operator must pause tuple submission every time a drain or a reset occurs.
Code your operator so drains and resets happen at natural points of an operator execution, such as after the operators change their internal state and when they submit a tuple, or after a more elaborate operation, such as when the operators reach an end of file while they read a file. After you define these natural points, acquire permits to guard this code block with a tuple submission permit that is specific to a consistent region.
A consistent region permit ensures that if an operator thread is running the code block, drains, and resets do not occur. Multiple threads can hold permits at the same time. However, drains and resets occur after they receive the corresponding notifications from the consistent region controller, and after all operator threads release their permits. After that, the InfoSphere Streams instance calls the StateHandler methods (drain, checkpoint, and reset), and starts draining or resetting the downstream operators of the region.
If any thread attempts to acquire a permit when a checkpoint or reset is in progress, the thread blocks. A thread also blocks on a permit when the operator restarts and has not yet reset. The thread is granted a permit, that is, unblocks, after a consistent state is established or reset successfully and the consistent region controller notifies that tuple submission can be resumed.
- Calling a paired ConsistentRegionContext::acquirePermit() and ConsistentRegionContext::releasePermit()
- Defining a scope with a ConsistentRegionPermit, which is a RAII handler for the permit acquisition and release methods in the ConsistentRegionContext
A Java™ primitive operator can acquire and release permits by calling acquirePermit() and releasePermit() from a ConsistentRegionContext. Releasing a permit is an indication that a consistent state can be correctly established or reset. If an unrecoverable error happens when a permit is being held, the operator must explicitly indicate that a consistent region must reset via the ConsistentRegionContext.
A thread can block when it tries to acquire a lock. As a result, be mindful of when the code acquires a lock to avoid deadlock situations. If a thread must acquire the lock, then acquire it inside the code block, which is running with a permit. The reason to acquire a lock inside the code block with a permit is that the same lock might be acquired by a different thread, which then cannot acquire the lock until the drain or reset is complete. An example of a different thread that is unable to acquire the lock if it is not acquired inside a code block with a permit is the SPL runtime thread, which cannot not acquire the lock when the thread calls checkpoint or reset.
Sample start operator
This example concerns only source operators that start a consistent region. The same principles that are used for acquiring permits can be applied to primitive operators that do not start a region but have background threads that do tuple submission.
- On the failure of other operators, the StartOfRegion operator continues submitting new tuples with attribute values that increase by one. Because tuples are sometimes not replayed, you might not see consecutive numbers in the output.
- On the failure of the operator itself, the StartOfRegion operator loses the value of the tuple that was last submitted. In this case, the application starts submitting data that starts at 0, which would make the whole application process data from the beginning of the application.
Header file
<%
my $crContext =
$model->getContext()->getOptionalContext("ConsistentRegion");
my @includes;
if ($crContext) {
push @includes,
"#include <SPL/Runtime/Operator/State/StateHandler.h>";
"#include <SPL/Runtime/Operator/State/ConsistentRegionContext.h>";
}
my $isTriggerOperator =
$crContext->isTriggerOperator();
if ($isTriggerOperator) {
SPL::CodeGen::errorln("Operator does not support trigger=operatorDriven in a
consistent region.", $model->getContext()->getSourceLocation());
}
SPL::CodeGen::headerPrologue($model,\@includes);
%>
The
rest of the header file for this operator is similar to the header file for the
StatefulPrimitive operator. The only difference is that it includes a new
member variable that is named _crContext of type pointer to a
ConsistentRegionContext. To view the header file for that operator, see
Stateful primitive operators that participate in consistent regions.C++ file
Similar to the header file, the C++ file uses the code generation API to conditionally compile code that is used only when the operator is in a consistent region.
- To protect tuple submission.
- To bind the tuple submission with the state update in a single operation.
By binding the tuple submission code to the state update code, the operator ensures that both operations occur as a single unit during a drain or a reset. This code means that when the operator checkpoints or resets its state, the state of the operator reflects the fact that both operations occurred. If these operations were not coupled, the operator code would need to be able to deal with different reset situations, for example, a reset in which the operator submitted the tuple but did not update the internal state. This issue arises because a drain or a reset can occur anytime the operator code is not running with an acquired permit.
void MY_OPERATOR::process(uint32_t)
{
ProcessingElement& pe = getPE();
while(!pe.getShutdownRequested()) {
<%if ($isInConsistentRegion) {%>
{
ConsistentRegionPermit crp(_crContext);
<%}%>
{
AutoMutex am (_mutex);
OPort0Type tuple(_counter);
submit (tuple, 0);
_counter++;
}
<%if ($isInConsistentRegion) {%>
}
<%}%>
pe.blockUntilShutdownRequest(0.2);
}
}
Any
primitive operator that has background tuple submission threads can use the
ConsistentRegionPermit as in the previous code sample to pause and resume
submission during a drain or reset.This example does not show the implementation of the operator state checkpoint and reset methods because those methods are similar to the implementation that is described in Stateful primitive operators that participate in consistent regions. For information about how to create a sample application that uses the StartOfRegion primitive operator, see the sample::ReplayableSource sample application at $STREAMS_INSTALL/samples/spl/feature/ConsistentRegion/sample/. A sample Java primitive operator that acquires consistent region permits can be found in com.ibm.streams.operator.samples.sources.SystemPropertySource.