IBM Streams 4.2

Operator Aggregate

Primitive operator image not displayed. Problem loading file: ../../image/tk$spl/op$spl.relational$Aggregate.svg

The Aggregate operator is used to compute user-specified aggregations over tuples that are gathered in a window.

Checkpointed data

When the Aggregate operator is checkpointed, the current state of the operator's window is saved in checkpoint. The window is checkpointed incrementally to reduce checkpoint size and latency. For count and delta trigger and eviction policies, the checkpoint also includes the correct information to fulfill the policies. For example, for count, the number of tuples until an eviction or trigger is checkpointed. Logic state variables (if present) are also included in checkpoint.

Behavior in a consistent region

The Aggregate operator can be an operator within the reachability graph of a consistent region. It cannot be the start of a consistent region.

When in a consistent region, the current state of the operator's windows is checkpointed. For count and delta trigger and eviction policies, the checkpoint includes the correct information to fulfill the policies. When the operator is reset, the current state of the windows is discarded and reloaded from the previous checkpoint. For count and delta trigger and eviction policies, the next eviction or trigger is reset to the state at the last checkpoint. For example, with a tumbling window and eviction policy of count(20), if the Aggregate operator is checkpointed with state from six tuples, then it next evicts after it processes 14 more tuples on the input port, regardless of how many tuples were received between the last checkpoint and the reset. When a window with time-based trigger and eviction policies is checkpointed or reset, timers continue to trigger at the moments that are indicated by the policy. But trigger and eviction policies are disabled until the next tuple is inserted. For example, for an Aggregate operator with a tumbling window with a policy of time(10) where timers are triggered at 0, 10, 20, 30, ... seconds, the operator is checkpointed at time t1 = 18 seconds and the next tuple is inserted at t2 = 22 seconds, the timer trigger is ignored at t3 = 20 seconds, and the first window eviction occurs at t4 = 30 seconds.

Checkpointing behavior in an autonomous region

When the Aggregate operator is in an autonomous region and configured with config checkpoint: periodic(T) clause, a background thread in the SPL runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its state from the last checkpoint.

When the Aggregate operator is in an autonomous region and configured with config checkpoint: operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.

Such checkpointing behavior is subject to change in the future.

Windowing

The Aggregate operator supports the following window configurations:

tumbling, (count | delta | time | punctuation)-based eviction (, partitioned (, partitionEvictionSpec)? )?

sliding, (count | delta | time)-based eviction, (count |delta |time)-based trigger (, partitioned (, partitionEvictionSpec)? )?

For the tumbling variants, tuples are aggregated when the window gets full (and flushes). The tuples that contain the aggregates are output at once, followed by a window marker punctuation. More than one tuple can be output when the groupBy parameter is specified.

For the sliding variants, tuples are aggregated when the window triggers. The tuples that contain the aggregates are output at once, followed by a window marker punctuation. More than one tuple can be output when the groupBy parameter is specified.

The sliding windows for an Aggregate operator do not fire until the window is full for the first time unless aggregateIncompleteWindows is true. This rule does not apply to sliding windows with time-based trigger policies. Such windows are assumed to be full when they start out.

Both for tumbling and sliding windows, when a time-based window with no tuples in it fires, just a window marker punctuation is output. When a tumbling, punctuation-based window with no tuples in it receives a window marker punctuation, just a window marker punctuation is output.

For the partitioned variants, the window specification and parameters apply to individual subwindows identified by the partitionBy parameter, as if there were separate Aggregate operators for each partition.

The final punctuation marker does not flush any of the pending windows.

Assignments

The Aggregate operator allows aggregated assignments to output attributes. An aggregated assignment has an aggregation function that appears on the right side of the assignment. All expressions that are passed to an aggregate function cannot contain side effects, except expressions that are passed to the Custom aggregation function.

Examples

These examples demonstrate how to use the Aggregate operator.

composite Main {
 graph
    stream<rstring name, uint64 id, rstring country,
           rstring city, uint32 age, int32 salary> Beat = Beacon() {}

    // tumbling window with no group by
    stream<int32 maxSalary, uint32 ageOfMaxSalary> Agg0 = Aggregate(Beat) {
      window Beat: tumbling, time(10.5);
      output Agg0: maxSalary = Max(salary), ageOfMaxSalary = ArgMax(salary, age);
    }

    // tumbling window with group by
    stream<rstring country, rstring city, int32 maxSalary> Agg1 = Aggregate(Beat) {
      window Beat: tumbling, punct();
      param groupBy: country, city;
      output Agg1: maxSalary = Max(salary);
    }

    // tumbling partitioned window with no group by
    stream<int32 maxSalary, int32 numPeopleWithMaxSalary> Agg2 = Aggregate(Beat) {
      window Beat: tumbling, delta(id, 10lu), partitioned;
      param partitionBy: country, city;
      output Agg2: maxSalary = Max(salary), numPeopleWithMaxSalary = MaxCount(salary);
    }

    // tumbling partitioned window with group by
    stream<rstring city, int32 maxSalary, list<rstring> peopleWithMaxSalary> Agg3 = Aggregate(Beat) {
      window Beat: tumbling, count(10), partitioned;
      param groupBy: city;
            partitionBy: country;
      output Agg3: maxSalary = Max(salary), peopleWithMaxSalary = CollectArgMax(salary, name);
    }

    // sliding window with no group by
    stream<Beat, tuple<int32 maxSalary, uint32 ageOfMaxSalary>> Agg4 = Aggregate(Beat) {
      window Beat: sliding, time(10.5), count(10);
      output Agg4: maxSalary = Max(salary), ageOfMaxSalary = ArgMax(salary, age);
    }

    // sliding window with group by
    stream<Beat, tuple<int32 maxSalary>> Agg5 = Aggregate(Beat) {
      window Beat: sliding, count(10), count(1);
      param groupBy: country, city;
      output Agg5: maxSalary = Max(salary);
    }

    // sliding partitioned window with no group by
    stream<Beat, tuple<int32 maxSalary, int32 numPeopeWithMaxSalary>> Agg6 = Aggregate(Beat) {
      window Beat: sliding, delta(id, 10lu), count(10), partitioned;
      param partitionBy: country, city;
      output Agg6: maxSalary = Max(salary), numPeopeWithMaxSalary = MaxCount(salary);
    }

    // sliding partitioned window with group by
    stream<Beat, tuple<int32 maxSalary, list<rstring> peopleWithMaxSalary>> Agg7 = Aggregate(Beat) {
      window Beat: sliding, count(10), time(1), partitioned;
      param groupBy: city;
            partitionBy: country;
      output Agg7: maxSalary = Max(salary), peopleWithMaxSalary = CollectArgMax(salary, name);
    }
}

The following example shows the use of the Custom output function in the Aggregate operator:

/* re-implement Average and Collect as Custom implementations to show the same use */
type AvgContext = int32 sum, int32 count;
type CollectContext = list<int32> values;

boolean init (mutable AvgContext c) {
  c.sum = 0;
  c.count = 0;
  return false;
}

boolean process (int32 value, mutable AvgContext c) {
  c.count++;
  c.sum += value;
  return false;
}

int32 result (AvgContext c) {
  if (c.count == 0)
    return 0;
  return c.sum/c.count;
}

boolean init (mutable CollectContext c) {
  clearM(c.values);
  return false;
}

boolean process (int32 value, mutable CollectContext c) {
  appendM(c.values, value);
  return false;
}

list<int32> result (CollectContext c) {
  return c.values;
}

composite Main {
  graph
    stream<int32 data>A = Beacon () {
      logic state: mutable int32 i = 0;
      param iterations: 100;
      output A: data = i++;
    }
    
    stream<int32 avg, list<int32> collect, int32 stdAvg, list<int32> stdCollect> B = Aggregate (A) {
      logic state: {
        mutable AvgContext avgContext;
        mutable CollectContext collectContext;
      }
      window A: tumbling, count(8);
      output B: avg = Custom (init(avgContext), process(data, avgContext),result (avgContext)),
                collect = Custom (init(collectContext), process(data, collectContext), result (collectContext)),
                stdAvg = Average (data),
                stdCollect = Collect(data);
    }

    () as Nul = FileSink (B) {
      param file : "out";
    }
}

Output attributes missing assignments are automatically forwarded from the input ones by using the Last aggregation function.

Summary

Ports
This operator has 1 input port and 1 output port.
Windowing
This operator requires a windowing configuration.
Parameters
This operator supports 4 parameters.

Optional: aggregateEvictedPartitions, aggregateIncompleteWindows, groupBy, partitionBy

Metrics
This operator reports 1 metrics.

Properties

Implementation
C++
Threading
WindowTriggerBound - Operator provides a single threaded execution context only if a time-based window trigger policy is not used.

Input Ports

Ports (0)

The Aggregate operator is configurable with a single input port. The input port is non-mutating and its punctuation mode is WindowBound. The Aggregate operator processes window punctuation markers when configured with a punctuation-based window.

Properties

Output Ports

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes. Attributes not assigned in the output clause will be automatically assigned from the attributes of the input ports that have the same name and type. If there is no such input attribute, an error is reported at compile-time.
Output Functions
RelationalAggregation
int32 Count()

Returns the number of tuples in the group or window.

int32 CountGroups()

Returns the number of groups in the window.

int32 CountAll()

Returns the number of tuples in the window.

list<int32> CountByGroup()

Returns a list of the number of tuples in each group.

<any T> T Any(T v)

Returns the attribute of one of the tuples in the group or window.

<any T> T First(T v)

Returns the attribute of the first the tuple in the group or window.

<any T> T Last(T v)

Returns the attribute of the last the tuple in the group or window.

<any T> list<T> Collect(T v)

Returns a list of the specified attribute for all tuples in the group or window.

<any T> list<T> CollectDistinct(T v)

Returns a list of the distinct attribute values for tuples in the group or window.

<any T> list<int32> CountByDistinct(T v)

Returns a list of the count of distinct attribute values for tuples in the group or window.

<any T> int32 CountDistinct(T v)

Returns the number of distinct attribute values for tuples in the group or window.

<numeric T> T Average(T v)

Returns the average of the specified attribute value for tuples in the group or window.

<numeric T> list<T> Average(list<T> v)

Returns the average of the specified attribute value for tuples in the group or window. NOTE: All lists must have the same size.

<numeric T>[N] list<T>[N] Average(list<T>[N] v)

Returns the average of the specified attribute value for tuples in the group or window.

<numeric T> T Sum(T v)

Returns the sum of the specified attribute value for tuples in the group or window.

<numeric T> list<T> Sum(list<T> v)

Returns the sum of the specified attribute value for tuples in the group or window. NOTE: All lists must have the same size.

<numeric T>[N] list<T>[N] Sum(list<T>[N] v)

Returns the sum of the specified attribute value for tuples in the group or window.

<string T> T Sum(T v)

Returns a concatenation of the specified attribute value for tuples in the group or window.

<ordered T> T Max(T v)

Returns the maximum of the specified attribute value for tuples in the group or window.

<numeric T> list<T> Max(list<T> v)

Returns the maximum of the specified attribute value for tuples in the group or window. NOTE: All lists must have the same size.

<numeric T>[N] list<T>[N] Max(list<T>[N] v)

Returns the maximum of the specified attribute value for tuples in the group or window.

<ordered T> T Min(T v)
Returns the minimum of the specified attribute value for tuples in the group or window. NOTE: The Min and Max aggregate functions do a column-wise min or max on the lists. For example:
Min([1,2,1], [1,1,2]) == [1,1,1], 
Which is column-wise comparison.

<numeric T> list<T> Min(list<T> v)

Returns the minimum of the specified attribute value for tuples in the group or window. NOTE: All lists must have the same size.

<numeric T>[N] list<T>[N] Min(list<T>[N] v)

Returns the minimum of the specified attribute value for tuples in the group or window.

<ordered T> int32 MaxCount(T v)

Returns the number of values of the maximum specified attribute value for tuples in the group or window.

<ordered T> int32 MinCount(T v)

Returns the number of values of the minimum specified attribute value for tuples in the group or window.

<ordered T, any K> K ArgMin(T v, K w)

Returns the value of the expression that corresponds to the minimum specified attribute value for tuples in the group or window.

<ordered T, any K> K ArgMax(T v, K w)

Returns the value of the expression that corresponds to the maximum specified attribute value for tuples in the group or window.

<ordered T, any K> list<K> CollectArgMin(T v, K w)

Returns a list of values of the expression that corresponds to the minimum specified attribute value for tuples in the group or window.

<ordered T, any K> list<K> CollectArgMax(T v, K w)

Returns a list of values of the expression that corresponds to the maximum specified attribute value for tuples in the group or window.

<numeric T> T SampleStdDev(T v)

Returns the Sample Standard Deviation of the specified attribute value for tuples in the group or window.

<numeric T> T PopulationStdDev(T v)

Returns the Population Standard Deviation of the specified attribute value for tuples in the group or window.

<any T> T Custom(boolean initFcn, boolean processFcn, T resultFcn)

User-supplied functions for computing an aggregation over tuples in the group or window. All expressions that are passed in must be function calls. The initFcn and resultFcn expressions cannot reference input attributes. For each window, partition, or group, initFcn is invoked once, processFcn is invoked for each tuple, and resultFcn is invoked to return the computed result.

For every use of the Custom custom output function, there should be a mutable state variable to hold the context, which is used to hold information between function invocations. initFcn initializes the context, processFcn updates it with values from each tuple, and resultFcn returns the desired result. initFcn and processFcn must return a boolean value, which is ignored.

Ports (0)

The Aggregate operator is configurable with a single output port. The output port is mutating and its punctuation mode is Generating. The Aggregate operator generates a window punctuation after each batch of aggregations that it outputs.

Properties

Parameters

This operator supports 4 parameters.

Optional: aggregateEvictedPartitions, aggregateIncompleteWindows, groupBy, partitionBy

aggregateEvictedPartitions

This parameter is valid only for tumbling windows with a partition eviction specification. The default value is false. When set to true, aggregations are done when a partition is evicted by the partition eviction specification. The aggregations are performed the same way as they would if the window had been flushed, except aggregations are performed only on a single partition. If the partition is empty, no aggregations are performed, and a window marker punctuation is output.

Properties

aggregateIncompleteWindows

This parameter is valid only for sliding windows. The default value is false. When set to true, aggregations are done when a trigger occurs, even if the window is not filled up. If set to false, triggers before the window is full are ignored.

Properties

groupBy

Specifies one or more expressions to be used for dividing the tuples in a window into groups. When a window fires (a sliding window triggers or a tumbling window flushes), one tuple with the user-specified aggregations is computed for each group in the window and these tuples are output as a batch. A window marker punctuation is output after the tuples.

Properties

partitionBy

Specifies one or more expressions to be used for partitioning the input tuples into subwindows, where all window and parameter configurations apply to the subwindows, independently. This parameter is only valid for an Aggregate operator that is configured with a partitioned window.

Properties

Code Templates

Aggregate
stream<${schema}> ${outputStream} = Aggregate(${inputStream}) {
  window ${inputStream}: ${windowMode};
  output ${outputStream}: ${outputExpression}
}
      

Aggregate with groupBy
stream<${schema}> ${outputStream} = Aggregate(${inputStream}) {
  window ${inputStream}: ${windowMode};
  param groupBy: ${groupByExpression};
  output ${outputStream}: ${outputExpression}
}
      

Aggregate with partitionBy
stream<${schema}> ${outputStream} = Aggregate(${inputStream}) {
  window ${inputStream}: ${windowMode};
  param partitionBy: ${partitionByExpression};
  output ${outputStream}: ${outputExpression}
}
      

Metrics

nCurrentPartitions - Gauge

The number of partitions in the window.

Libraries

spl-std-tk-lib
Library Name: streams-stdtk-runtime
Include Path: ../../../impl/include