Aggregation operator examples
The following examples show how to use aggregation operators:
Example 1: Aggregating Clickstream data
Goal: Use sample Clickstream data to answer the following questions about online sales activity.
-
For every minute, how many unique customers are there for every event type?
-
For every minute, what is the total price of items in a customer’s shopping cart for every event type?
Clickstream data is data that is generated by a user’s interaction with an online website. For example, the user might browse for an item, add the item to a shopping cart, remove an item from the cart, log out without any purchase, or complete a purchase. Every click generates an event that can be captured in a streams flow.
Let’s design a simple streams flow in the canvas with three operators.
-
Sample Data – This operator provides sample Clickstream data.
-
Aggregation – This operator aggregates the data by customer ID and then applies specified functions on the data every minute.
-
‘Debug’ operator - This operator acts as a target so that you can view tuples coming from a selected operator and see if they represent the expected output.

Now let’s define each operator in its Properties pane.
Define Sample Data operator
Select Clickstream. Click Edit Source Schema to look at the schema that’s coming in to the streams flow.
Define Aggregation operator
We define the parameters of this operator to give the metrics that we need to answer our two questions. Set the following properties.

Aggregation Window Type is set to tumbling because we want the aggregation functions done at a set time interval, regardless of how often or how many tuples arrive. For more
information, see tumbling windows, sliding windows.
Aggregation Window size is determined by Time Unit and Number of Time Units. In our case, we’ll set it to minute and 1 because we want metrics every minute. Every minute, all tuples ‘tumble out’ and an Aggregation function is applied to each field.
We select the Use timestamp check box because we want to use the timestamp of each event rather than use the system timestamp of when the tuple arrived
The window is partitioned by customer_id into subwindows. This partition gives us metrics by customer.

We set up three Aggregation functions to define what metrics are done on all tuples in every subwindow (meaning, every customer). The following Aggregation functions are done on all subwindows every minute.
-
click_event_type – The function PassThrough passes the type of event that occurred to a field called “click_event_type”. This field tells us what action the user did, such as logging in, browsing, adding an item to the cart, and so on. The function
PassThroughpasses the schema attributes through to the output without doing any sort of computation or change. -
total_carts – The function Average computes the average price for a customer’s shopping cart and stores the value in a field called “total_carts“.
-
unique_users – The function CountDistinct gives the number of unique users that are logged in, and stores the value in a field called “unique_users”. We do not use the function Count because it gives the total number of users, including multiple entries of users.

Here’s how our streams flow looks now.

The following functions can be applied to specific schema attributes.
| Aggregate function | Argument type | Return type | Return value |
|---|---|---|---|
| Average | Integer, float | Float | Average of all input values. Null if no rows are selected. |
| Count | Integer | Same as argument | Number of all non-null input rows. |
| CountDistinct | Integer | Same as argument | Number of distinct expression values that are computed for the tuples in the group. |
| Maximum | Integer, float | Same as argument | Maximum element across all input values. Null if no rows are selected. |
| Minimum | Integer, float | Same as argument | Minimum element across all input values. Null if no rows are selected. |
| PassThrough | Integer, float, text | Same as argument | Passes schema attribute to the output as is. |
| Sum | Integer, float | Same as argument | Sum of all input values. Null if no rows are selected. |
| Standard Deviation | Integer, float | Same as argument | Standard deviation of all input values. Null if no rows are selected. |
Run the streams flow
All operations are now defined. We save the streams flow, and then click
to run the flow.
When the streams flow runs, the Ingestion Rate for the Clickstream data and the Throughput rate for the input and output of the Aggregation operator are shown.

Example 2: Parallel Aggregation
Let’s say that our online website has irregular peaks of data according to the hour of the day and the season.
In this case, use parallel Aggregation operators. Both Aggregation operators are set up identically, except that each operator uses a different value for Time and Number of Time Units to evict tuples.
In our Clickstream example, the first Aggregation operator has a tumbling window every minute. Let’s set the second Aggregation operator to have a tumbling window every 10 minutes.
More examples
For more examples of the Aggregation operator, see the article Calculate moving averages on real-time data in a streams flow.