Package com.ibm.streams.operator.window

Input port windowing interfaces and classes.

See: Description

Package com.ibm.streams.operator.window Description

Input port windowing interfaces and classes. A window is a well-defined set of tuples that have arrived at the input port. The definition of the contents of a window is defined by an eviction policy. A window is described by StreamWindow, see its documentation for definitions of tumbling and sliding windows.
A window's definition is declared either by the invocation of an operator in an SPL application, or by the operator's InputPortDeclaration window's configuration (see tumbling() or sliding() in a Java declared flow.

A window generates events as StreamWindowEvent which an operator handles using an implementation of StreamWindowListener and registering it with StreamWindow.registerListener(StreamWindowListener, boolean). Each event has a specific StreamWindowEvent.Type, the most significant event types are:

  • StreamWindowEvent.Type.INSERTION One or more tuples have been inserted into the window.
  • StreamWindowEvent.Type.EVICTION
    • For a tumbling window the window has tumbled, all tuples are being evicted. This is typically where tuples are submitted, representing some aggregation of the window, the set of tuples being evicted.
    • For a sliding window the window has slide, one or more tuples are being evicted, as defined by the eviction policy.
  • StreamWindowEvent.Type.TRIGGER A sliding window is being triggered according to its trigger policy. This is typically where tuples are submitted, representing some aggregation of the window, the set of tuples currently in the window.
The operator's StreamWindowListener implements the logic against the window, by handling the window's events. Typically this is an algorithm that aggregates the window's contents, but there are no restrictions on how the events are handled.
Ideally the algorithm can be maintained incrementally as each tuple is inserted into the window, so that latency is reduced when the window tumbles or is triggered, by not having to compute an algorithm over the complete set of tuples in the window.

When a window is partitioned, the operator provides an implementation of StreamWindowPartitioner that maps a Tuple to a partition key, using getPartition(tuple).
Each unique partition key creates its own independent window partition with the window's policy definitions. Thus the set of tuples in each window partition independently honors any eviction and trigger policies.
For example with a tumbling window with count eviction policy of size three, partitioned by a customer identifier, will result in a unique window partition for each customer. Thus at some point in time there might be two partitions, for customer identifiers A345 and B823. The partition for A345 has a single tuple, and B823 two tuples. A single tuple for customer B823 arriving at the input port will cause the partition for B823 to tumble, as it now contains three tuples. Partition A345 tumbles independently of any other partition and in this case would require two more tuples for A345 to arrive at the input port.

When an operator is checkpointed, either when a checkpoint configuration exists or the operator is in a consistent region, the SPL runtime persists the complete state of the window, including its contents and any state information required to correctly restart event processing if the operator later resets to that checkpoint. This window state is always persisted and does not require the operator to register a StateHandler.
Typically the operator will also maintain window state, such as the state of the aggregation being perfomed, this state may be instance fields of the operator class or its StreamWindowListener. This operator specific state is not persisted automatically by the SPL runtime, instead the operator register a StateHandler instance to persist such state on a checkpoint.
Each window performs these actions on these state events:

  • drain - Any outstanding EVICTION events are completed, waiting for each StreamWindowListener.handleEvent() to return.
  • checkpoint - Contents (tuples) of all window partitions are persisted along with any state required to correctly restart event processing.
  • reset to initial state
  • - All window contents are discarded without any event handling, and window is reset as though it had just been created. If the window is partitioned this means that all partitions are discarded and not recreated (until created by subsequent tuple processing)
  • reset from a checkpoint - All window contents are discarded without any event handling. All partitions that exist in the checkpoint are recreated with their persisted contents and event handling is restarted to correctly honor the eviction and trigger policies. Any partitions that existed before the reset but not in the checkpoint are discarded and not recreated (until created by subsequent tuple processing).
Note that time based policies are based upon wall-clock time and are correctly honored when processing is resumed after a reset from a checkpoint, that is time is not stopped or reset by any reset operation.
For example a sliding time based eviction policy of five seconds states that any tuple that has been in the window for more than five seconds must be evicted. So a checkpoint can persist a window containing 20 tuples that had four more seconds to live in the window at the time the checkpoint was taken. However if a reset to that checkpoint occurs more than four seconds later, then by the window's eviction policy all those tuples must be evicted, even though they have just been restored. Thus the tuples are evicted, through an EVICTION event.
Similarly, with a tumbling eviction time policy of 30 seconds (since the last tumble), then any time while the operator was not active before the reset is included in the calculation, so that the tumble takes place 30 seconds after the last tumble recorded in the checkpoint. If, when the reset occurs the tumble is overdue, say in this case the PE was down for 45 seconds, then the tumble takes place as soon as processing is resumed.
If the operator registered a StateHandler then its methods are invoked after the corresponding actions have occurred for all its windowed input ports. For example, when StateHandler.drain() is called it is guaranteed all windowed input ports have been drained.
The only change in window contents that a StateHandler may perform is eviction of partitions using StreamWindow.evictPartition(Object) during its drain processing. In this case the window must be partitioned.