Aggregation operator examples

The following examples show how to use aggregation operators:

Example 1: Aggregating Clickstream data

Goal: Use sample Clickstream data to answer the following questions about online sales activity.

  1. For every minute, how many unique customers are there for every event type?

  2. For every minute, what is the total price of items in a customer’s shopping cart for every event type?

Clickstream data is data that is generated by a user’s interaction with an online website. For example, the user might browse for an item, add the item to a shopping cart, remove an item from the cart, log out without any purchase, or complete a purchase. Every click generates an event that can be captured in a streams flow.

Let’s design a simple streams flow in the canvas with three operators.

  1. Sample Data – This operator provides sample Clickstream data.

  2. Aggregation – This operator aggregates the data by customer ID and then applies specified functions on the data every minute.

  3. ‘Debug’ operator - This operator acts as a target so that you can view tuples coming from a selected operator and see if they represent the expected output.

    Aggregation streams flow

Now let’s define each operator in its Properties pane.

Define Sample Data operator

Select Clickstream. Click Edit Source Schema to look at the schema that’s coming in to the streams flow.

Define Aggregation operator

We define the parameters of this operator to give the metrics that we need to answer our two questions. Set the following properties.

Aggregator properties

Aggregation Window Type is set to tumbling because we want the aggregation functions done at a set time interval, regardless of how often or how many tuples arrive. For more information, see tumbling windows, sliding windows.

Aggregation Window size is determined by Time Unit and Number of Time Units. In our case, we’ll set it to minute and 1 because we want metrics every minute. Every minute, all tuples ‘tumble out’ and an Aggregation function is applied to each field.

We select the Use timestamp check box because we want to use the timestamp of each event rather than use the system timestamp of when the tuple arrived

The window is partitioned by customer_id into subwindows. This partition gives us metrics by customer.

Aggregation functions

We set up three Aggregation functions to define what metrics are done on all tuples in every subwindow (meaning, every customer). The following Aggregation functions are done on all subwindows every minute.

Aggregation output fields

Here’s how our streams flow looks now.

Streams flow

The following functions can be applied to specific schema attributes.

Aggregate function Argument type Return type Return value
Average Integer, float Float Average of all input values.
Null if no rows are selected.
Count Integer Same as argument Number of all non-null input rows.
CountDistinct Integer Same as argument Number of distinct expression values that are computed for the tuples in the group.
Maximum Integer, float Same as argument Maximum element across all input values.
Null if no rows are selected.
Minimum Integer, float Same as argument Minimum element across all input values.
Null if no rows are selected.
PassThrough Integer, float, text Same as argument Passes schema attribute to the output as is.
Sum Integer, float Same as argument Sum of all input values.
Null if no rows are selected.
Standard Deviation Integer, float Same as argument Standard deviation of all input values.
Null if no rows are selected.

Run the streams flow

All operations are now defined. We save the streams flow, and then click Metrics icon to run the flow.

When the streams flow runs, the Ingestion Rate for the Clickstream data and the Throughput rate for the input and output of the Aggregation operator are shown.

Metrics page

Example 2: Parallel Aggregation

Let’s say that our online website has irregular peaks of data according to the hour of the day and the season.

In this case, use parallel Aggregation operators. Both Aggregation operators are set up identically, except that each operator uses a different value for Time and Number of Time Units to evict tuples.

In our Clickstream example, the first Aggregation operator has a tumbling window every minute. Let’s set the second Aggregation operator to have a tumbling window every 10 minutes.

More examples

For more examples of the Aggregation operator, see the article Calculate moving averages on real-time data in a streams flow.

Learn more