Package com.ibm.streams.operator.window
See: Description
-
Interface Summary Interface Description CheckpointableWindowListener<O extends Operator,T> StreamWindow<T> A window connected to an input port (stream).StreamWindowListener<T> Handle StreamWindowEvents being generated by a StreamWindow.StreamWindowPartitioner<T,P> StreamWindowPartitioner provides the mapping from an tuple being inserted into the window to a partition. -
Class Summary Class Description AbstractWindowOperator Abstract operator that forces all tuple processing to be through window event handling.StatefulWindowListener<S,T> Abstract class that registers listeners with aStreamWindow
.StreamWindowEvent<T> An event for a windowed input port.WindowUtilities Utility methods for windowed input ports. -
Enum Summary Enum Description StreamWindow.Policy Policy type for eviction and trigger policies.StreamWindow.Type Window type.StreamWindowEvent.Type Stream window event type.
Package com.ibm.streams.operator.window Description
StreamWindow
,
see its documentation for definitions of
tumbling and sliding windows.
A window's definition is declared either by the invocation of an operator in an SPL application, or by the operator's
InputPortDeclaration
window's configuration (see tumbling()
or sliding()
in a Java
declared flow.
A window generates events as StreamWindowEvent
which
an operator handles using an implementation of
StreamWindowListener
and registering it with
StreamWindow.registerListener(StreamWindowListener, boolean)
.
Each event has a specific StreamWindowEvent.Type
, the most significant event
types are:
-
StreamWindowEvent.Type.INSERTION
One or more tuples have been inserted into the window. -
StreamWindowEvent.Type.EVICTION
- For a tumbling window the window has tumbled, all tuples are being evicted. This is typically where tuples are submitted, representing some aggregation of the window, the set of tuples being evicted.
- For a sliding window the window has slide, one or more tuples are being evicted, as defined by the eviction policy.
-
StreamWindowEvent.Type.TRIGGER
A sliding window is being triggered according to its trigger policy. This is typically where tuples are submitted, representing some aggregation of the window, the set of tuples currently in the window.
StreamWindowListener
implements the logic against the window, by
handling the window's events
.
Typically this is an algorithm that aggregates the window's contents, but there are no
restrictions on how the events are handled.
Ideally the algorithm can be maintained incrementally as each tuple is inserted into the window, so that latency is reduced when the window tumbles or is triggered, by not having to compute an algorithm over the complete set of tuples in the window.
When a window is partitioned, the operator provides an implementation of
StreamWindowPartitioner
that maps a Tuple
to a partition key, using getPartition(tuple)
.
Each unique partition key creates its own independent window partition with the
window's policy definitions. Thus the set of tuples in each window partition
independently honors any eviction and trigger policies.
For example with a tumbling window with count eviction policy of size three,
partitioned by a customer identifier, will result in a unique window partition
for each customer. Thus at some point in time there might be two partitions,
for customer identifiers A345
and B823
. The partition
for A345
has a single tuple, and B823
two tuples.
A single tuple for customer B823
arriving at the input port
will cause the partition for B823
to tumble, as it now contains three
tuples. Partition A345
tumbles independently of any other partition
and in this case would require two more tuples for A345
to arrive at
the input port.
When an operator is checkpointed, either when a
checkpoint configuration
exists or the operator is in a
consistent region
,
the SPL runtime persists the complete state of the window, including its
contents and any state information required to correctly restart event processing
if the operator later resets to that checkpoint. This window state is always
persisted and does not require the operator to
register
a StateHandler
.
Typically the operator will also maintain window state, such as the state of the
aggregation being perfomed, this state may be instance fields of the operator
class or its StreamWindowListener
. This operator specific state
is not persisted automatically by the SPL runtime, instead the
operator register a StateHandler
instance
to persist such state on a checkpoint.
Each window performs these actions on these state events:
- drain - Any outstanding
EVICTION
events are completed, waiting for eachStreamWindowListener.handleEvent()
to return. - checkpoint - Contents (tuples) of all window partitions are persisted along with any state required to correctly restart event processing.
- reset to initial state - All window contents are discarded without any event handling, and window is reset as though it had just been created. If the window is partitioned this means that all partitions are discarded and not recreated (until created by subsequent tuple processing)
- reset from a checkpoint - All window contents are discarded without any event handling. All partitions that exist in the checkpoint are recreated with their persisted contents and event handling is restarted to correctly honor the eviction and trigger policies. Any partitions that existed before the reset but not in the checkpoint are discarded and not recreated (until created by subsequent tuple processing).
For example a sliding time based eviction policy of five seconds states that any tuple that has been in the window for more than five seconds must be evicted. So a checkpoint can persist a window containing 20 tuples that had four more seconds to live in the window at the time the checkpoint was taken. However if a reset to that checkpoint occurs more than four seconds later, then by the window's eviction policy all those tuples must be evicted, even though they have just been restored. Thus the tuples are evicted, through an
EVICTION
event.
Similarly, with a tumbling eviction time policy of 30 seconds (since the last tumble), then any time while the operator was not active before the reset is included in the calculation, so that the tumble takes place 30 seconds after the last tumble recorded in the checkpoint. If, when the reset occurs the tumble is overdue, say in this case the PE was down for 45 seconds, then the tumble takes place as soon as processing is resumed.
If the operator registered a
StateHandler
then its methods are invoked
after the corresponding actions have occurred for all its windowed input ports.
For example, when StateHandler.drain()
is
called it is guaranteed all windowed input ports have been drained.
The only change in window contents that a
StateHandler
may perform
is eviction of partitions using
StreamWindow.evictPartition(Object)
during its drain processing. In this case the window must be partitioned.