com.ibm.streams.operator.state

Interface ConsistentRegionContext

  • All Superinterfaces:
    OptionalContext


    public interface ConsistentRegionContext
    extends OptionalContext
    Context information for the consistent region the operator is in. A consistent region is declared using the SPL consistent annotation

    In a consistent region an establishing operator establishes points where the region becomes consistent. A region is consistent if all operators in the region have state that reflects having completely processed all streams derived from the establishing operator's output streams.
    Thus the establishing operator creates a cut point where it requires that the region becomes consistent, by ensuring all operators in the region have processed all of its output streams (tuples and punctuation marks), effectively the tuples and marks have completely flowed through the region. A cut may be defined by the establishing operator, such as at a file boundary, or periodically such as every five seconds.
    To establish the consistent state the establishing operator stops submitting tuples and then calls makeConsistent().
    Each downstream operator in the region becomes consistent by draining its processing for all its input streams and then checkpointing its state.
    Once all operators in the region have made themselves consistent, the region has reached a consistent state.

    Operators may have input and output ports that are outside of the consistent region, in which case these ports are not involved in becoming consistent.
    An operator in a consistent region will have at least one input or output port that is part of the region.

    Any operator in a consistent region must acquire a permit before submitting tuples to output ports in the region. Only background activity needs to explicitly acquire and release permits, see getSubmissionSemaphore() for details on permit handling.

    Consistent Region limitations in InfoSphere® Streams Version 4.0

    • The mock test framework for Java primitives does not support testing of consistent region functionality

    Since:
    InfoSphere® Streams Version 4.0
    See Also:
    OperatorContext.getOptionalContext(Class)
    • Method Summary

      Methods 
      Modifier and Type Method and Description
      void acquirePermit()
      Acquire a single submission permit.
      javax.management.ObjectName getConsistentRegionMXBeanName()
      Get the object name of the ConsistentRegionMXBean MBean registered with the Job Control Plane.
      double getDrainTimeout()
      Get the drain timeout of the consistent region.
      int getIndex()
      Get the region index.
      java.util.Set<StreamingInput<?>> getInputPorts()
      Return the set of this operators input ports that are part of this consistent region.
      java.lang.Runnable getPermitRunnable(java.lang.Runnable task)
      Return a task that wraps task with permit acquisition and release for this region.
      int getResetAttempt()
      This method returns the current number of attempts of resetting a consistent region.
      double getResetTimeout()
      Get the reset timeout of the consistent region.
      long getSequenceId()
      This method returns the current sequence identifier of a consistent region.
      java.util.concurrent.Semaphore getSubmissionSemaphore()
      Get the semaphore used to control processing and tuple submission.
      boolean isEndOfRegion()
      Check if this operator is an end of a consistent region.
      boolean isStartOfRegion()
      Check if this operator is a start of a consistent region.
      boolean isTriggerOperator()
      Check if the operator is the trigger operator of a consistent cut region.
      boolean makeConsistent()
      Make the region consistent by draining processing and checkpointing state.
      void releasePermit()
      Release a single submission permit.
      void reset()
      Reset this consistent region to its last consistent state.
    • Method Detail

      • isStartOfRegion

        boolean isStartOfRegion()
        Check if this operator is a start of a consistent region.
        Returns:
        True if this operator is a start of a consistent region, false otherwise.
      • isEndOfRegion

        boolean isEndOfRegion()
        Check if this operator is an end of a consistent region.
        Returns:
        True if this operator is an end of a consistent region, false otherwise.
      • isTriggerOperator

        boolean isTriggerOperator()
        Check if the operator is the trigger operator of a consistent cut region.
        Returns:
        True if the operator is the trigger operator of a consistent region, false otherwise.
      • getInputPorts

        java.util.Set<StreamingInput<?>> getInputPorts()
        Return the set of this operators input ports that are part of this consistent region.
      • getIndex

        int getIndex()
        Get the region index.
        Returns:
        region index
      • getConsistentRegionMXBeanName

        javax.management.ObjectName getConsistentRegionMXBeanName()
        Get the object name of the ConsistentRegionMXBean MBean registered with the Job Control Plane. The object name can be used to subscribe to consistent region related notifications.
        Returns:
        Consistent region MBean object name
      • getSequenceId

        long getSequenceId()
        This method returns the current sequence identifier of a consistent region.

        The sequence identifier is the id that a drain or a reset should be associated to. After operator startup, this method returns 1. On operator restart, the method returns -1 until it reset.

        While a submission permit is held the returned value is stable.

        Returns:
        current sequence id of a drain or reset or -1 if the method is accessed after the operator restarts due to a PE crash but before the operator resets.
      • getResetAttempt

        int getResetAttempt()
        This method returns the current number of attempts of resetting a consistent region.

        If a drain is completed after a reset, this method returns -1.

        Returns:
        last reset attempt, or -1 if a drain has been successfully processed after a reset.
      • makeConsistent

        boolean makeConsistent()
        Make the region consistent by draining processing and checkpointing state.
        makeConsistent() is called by start operators to establish a point in time when the region was consistent. Normally the region becomes consistent by each operator draining all processing and checkpointing its state, in the case this method returns true.
        If a failure occurs, then the region becomes consistent by resetting to a previous consistent state, in this case the method returns false.

        The triggering operator calls makeConsistent() to start the drain and checkpoint cycle.
        With periodic triggering of drain and checkpoint cycles, start operators may also call makeConsistent() to start the drain and checkpoint cycle, though the start may be delayed until the next scheduled cycle.

        A drain and checkpoint cycle results in a call to StateHandler.drain() followed by StateHandler.checkpoint(com.ibm.streams.operator.state.Checkpoint).
        This call blocks until either the drain and and checkpoint cycle was completed for the region, or until the region is reset, if a failure occurred.
        When true is returned it is guaranteed that the cut identified by the value of getSequenceId() upon entry was completed without any reset. Thus any tuples submitted prior to this call or prior to the return from StateHandler.drain() have been completely processed by the region and the resultant operators' state checkpointed.
        When false is returned it is guaranteed that the cut identified by the value of getSequenceId() upon entry was reset. Tuples submitted during that cut should be resubmitted to ensure they are processed.
        If a drain and checkpoint cycle is not required or cannot be completed due to an ongoing reset, then it may not be started. In this case then the drain() and checkpoint methods may not be called, though the return value will represent the correct guarantee.

        For correct behavior, upon entry the caller must have acquired (implicitly or explicitly) a single permit.
        This permit is released during the method's processing but will be re-acquired before the method returns. After the method returns, thus the caller still has a permit and thus is free to continue to submit tuples. The current sequence identifier most likely will have changed.

        After the method returns the state of the operator may have changed, either due to a reset or concurrent processing by other threads.

        makeConsistent() happens-before its call to StateHandler.drain().

        Returns:
        true Region became consistent without any reset, false region was reset.
      • reset

        void reset()
                   throws java.io.IOException
        Reset this consistent region to its last consistent state.
        This method can be used by operators when detecting an operator-specific transient failure. If the region is already being reset, a new attempt for reset is requested.
        Throws:
        IOException - Reset can not be requested.
      • getSubmissionSemaphore

        java.util.concurrent.Semaphore getSubmissionSemaphore()
        Get the semaphore used to control processing and tuple submission. A permit from this semaphore is required to modify state and submit tuples to output ports in this region.
        A permit is implicitly acquired by the SPL Runtime during calls to any of these methods (and released once the method completes):
        When a permit is implicitly acquired, operator implementations need not code any permit acquisition or release, thus only background activity, such a threads in a source operator, that needs to partake in the consistent region must acquire a permit. A typical pattern for a source operator submitting tuples would be:
         semaphore.acquire();
         try {
           //get external data for tuples
           OutputTuple tuple = ...
           // update state to reflect tuple was submitted
           ...
           // submit tuples (can be one or many)
           getOutput(0).submit(tuple);
         } finally {
            semaphore.release();
         }
         
        Returns:
        Semaphore used to acquire permits for tuple submission to output ports in this region.
        See Also:
        acquirePermit(), releasePermit()
      • acquirePermit

        void acquirePermit()
                           throws java.lang.InterruptedException
        Acquire a single submission permit. Convenience method equivalent to getSemaphore().acquire()
        Throws:
        java.lang.InterruptedException
      • releasePermit

        void releasePermit()
        Release a single submission permit. Convenience method equivalent to getSemaphore().release()
        Throws:
        java.lang.InterruptedException
      • getPermitRunnable

        java.lang.Runnable getPermitRunnable(java.lang.Runnable task)
        Return a task that wraps task with permit acquisition and release for this region. The permit is acquired before calling task.run() and released before the returned Runnable run() method returns, even if an undeclared exception is thrown.
        Parameters:
        task - Task to be wrapped.
        Returns:
        task wrapped in a Runnable that acquires and releases a permit.
      • getDrainTimeout

        double getDrainTimeout()
        Get the drain timeout of the consistent region.
        Returns:
        drain timeout
        Since:
        InfoSphere® Streams Version 4.0.1
      • getResetTimeout

        double getResetTimeout()
        Get the reset timeout of the consistent region.
        Returns:
        reset timeout
        Since:
        InfoSphere® Streams Version 4.0.1