Window
The Window processor produces new batches of data from incoming batches based on the configured window type. Use the Window processor in streaming pipelines only. The processor is not supported in batch pipelines.
The Window processor creates windows of data and passes each window downstream as a batch. The processor can create the following types of windows: tumbling, sliding, session, and hopping.
You might use the Window processor in streaming pipelines to merge small batches into larger batches for more meaningful downstream processing.
For example, when using an Aggregate processor in a streaming pipeline, you might place the Window processor before the Aggregate processor to create larger batches for aggregate calculations. Similarly, when using a Join processor in a streaming pipeline, you might use the Window processor before the Join processor to enable joins on larger data sets.
When you configure the Window processor, you specify the window type to use and the properties for the window type.
When a window contains no data, the Window processor passes an empty batch downstream.
Window Types
The Window processor creates windows based on the configured window type, then passes the window downstream as a batch.
- Tumbling
- Creates windows of the specified length, opening a new window as soon as the previous closes. You specify the window length in milliseconds. The first window begins when you start the pipeline.
- Sliding
-
Creates a window of a specified length when a triggering condition occurs. You specify the window length in milliseconds. You can configure the condition using any Spark SQL syntax that can be used in the WHERE clause of a query.
Use the sliding window type to create windows when a specified condition occurs. When the condition occurs, the processor creates a window that includes the data that arrived within the window length and the batch that contains the triggering condition. Depending on the window configuration and the flow of data, the sliding window can drop records or pass duplicate records downstream.
For example, say you set the window length to 1000 milliseconds and the condition to
inventory is NULL
, and say ten batches arrive every second. Then every time theinventory
field contains a null value, the processor creates a window that includes the ten batches that arrived in the previous second as well as the batch that includes the record with the nullinventory
field.In this example, if the next batch contains a null
inventory
field, the new window contains duplicate data. If the next nullinventory
field arrives 5 seconds later, then the data between the two windows is dropped from the pipeline. - Session
- Creates a window based on arriving data, closing the window when the
specified interval of time elapses without incoming data. You configure the
session timeout in milliseconds.
Use the session window type to create windows based on expected breaks in the incoming flow of data. With this window, all data is passed downstream exactly once. The session window does not drop records or pass duplicate records downstream. However, the size of the windows is dependent on the window configuration and the flow of data.
For example, say you want a downstream Deduplicate processor to deduplicate records from a regular flow of log data that pauses for one second at the beginning of every hour. To do this, in the Window processor, you set a 800 millisecond session timeout to trigger window creation.
Though this is 200 milliseconds less than one second, you know that this gap is enough to indicate that the session should be closed. The gap also allows for the occasional case where the pause is a bit less than one second. The 200 milliseconds without data at the beginning of the next window is short enough that it does not trigger window creation.
In this example, if the flow of data changes, preventing a consistent one-second break, then the windows created by the processor may expand to an unexpected size.
- Hopping
- Creates a window of a specified length, then hops forward to create another window. You configure both the length of the window and the hop interval in milliseconds. The hop interval is measured from the beginning of the previous window.
Sliding Window Condition
When you use the sliding window type, you configure a condition. The condition determines when the Window processor creates a sliding window. The condition must evaluate to true or false. When the condition evaluates to true, the Window processor creates a window of the specified size and passes it downstream as a batch. The window includes the data that arrived within the window length and the batch that contains the triggering condition.
You can use a condition as simple as ID is NULL
to create windows when a
record with a null ID
field arrives, or you can create as complex a
condition as needed.
- When you define a condition, you typically base it on field values in the record. For information about referencing fields in the condition, see Referencing Fields in Spark SQL Expressions.
- You can use any Spark SQL
syntax that can be used in the WHERE clause of a query, including functions such as
isnull
ortrim
and operators such as=
or<=
.You can also use user-defined functions (UDFs), but you must define the UDFs in the pipeline. Use a pipeline preprocessing script to define UDFs.
For more information about Spark SQL functions, see the Apache Spark SQL Functions documentation.
year(transaction_date) <= 2018
Sample Conditions
The following table lists some common scenarios that you might adapt for your use:Condition Example | Description |
---|---|
transactions <= 0 |
When the value in the transactions field is
less than or equal to 0, the processor creates a sliding window
of the specified size. |
accountId is NULL |
When the accountId field contains a null
value, the processor creates a sliding window of the specified
size.Note that |
upper(message) like '%ERROR%' |
When the message field contains the string,
ERROR , the processor creates a sliding
window of the specified size.The condition changes the
strings in the |
initcap(country) like 'China' OR initcap(country)
like 'Japan' |
When the value in the country field is
China or Japan , the
processor creates a sliding window of the specified size.The
condition changes the strings in the
|