IBM Streams 4.2.1

Operator Sort

Primitive operator image not displayed. Problem loading file: ../../image/tk$spl/op$spl.relational$Sort.svg

The Sort operator is used to order tuples that are based on user-specified ordering expressions and window configurations.

Checkpointed data

When the Sort operator is checkpointed, the contents of its window is saved in checkpoint. The window is checkpointed incrementally to reduce checkpoint size and latency. Logic state variables (if present) are also included in checkpoint.

Behavior in a consistent region

A Sort operator can be used in a consistent region, but it cannot be the start of the region. In a consistent region, a Sort operator stores its state when a checkpoint is taken. When the region is reset, the operator restores the state from the checkpoint.

Checkpointing behavior in an autonomous region

When the Sort operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its state from the last checkpoint.

When the Sort operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.

Such checkpointing behavior is subject to change in the future.

Windowing

The Sort operator supports the following window configurations:

tumbling, (count | delta | time | punctuation)-based eviction (, partitioned (, partitionEvictionSpec)? )?

sliding, count-based eviction, count-based trigger of 1 (, partitioned (, partitionEvictionSpec)? )?

For the tumbling variants, tuples are sorted when the window gets full and are output immediately. A window marker punctuation is output at the end.

For the sliding variants, tuples are always kept in sorted order. When the window gets full, every new tuple causes the first one in the sorted order to be removed from the window and output. This type of sort is referred to as progressive sort.

For the partitioned variants, the window specification applies to individual subwindows identified by the partitionBy parameter.

For the tumbling variants, the final punctuation marker does not flush the window (so as not to break invariants on the output). For the sliding variants (progressive), the final punctuation marker does flush the window.

When a tumbling, punctuation-based window with no tuples in it receives a window marker punctuation, just a window marker punctuation is output.

Assignments

The Sort operator does not allow assignments to output attributes. The output tuple attributes are automatically forwarded from the input ones.

Examples

This example uses the Sort operator.

composite Main {                                                                          
  graph                                                                                   
    stream<rstring name, uint32 age, uint64 salary> Beat = Beacon() {}                    
    // count based window                                                                 
    stream<Beat> Sorted0 = Sort(Beat)                                                     
    {                                                                                     
      window                                                                              
        Beat : tumbling, count(10);                                                       
      param                                                                               
        sortBy : name, (float64)salary/(float64)age;                                      
    }                                                                                     
    // count based partitioned window                                                     
    stream<Beat> Sorted1 = Sort(Beat)                                                     
    {                                                                                     
      window                                                                              
        Beat : tumbling, count(10), partitioned;                                          
      param                                                                               
        partitionBy : name;                                                               
        sortBy      : (float64)salary/(float64)age;                                       
    }                                                                                     
    // count based window, with sort order                                                
    stream<Beat> Sorted2 = Sort(Beat)                                                     
    {                                                                                     
      window                                                                              
        Beat : tumbling, count(10);                                                       
      param                                                                               
        sortBy : name, (float64)salary/(float64)age;                                      
        order  : descending;                                                              
    }                                                                                     
    // count based window, with sort order for each sortBy expression                     
    stream<Beat> Sorted3 = Sort(Beat)                                                     
    {                                                                                     
      window                                                                              
        Beat : tumbling, count(10);                                                       
      param                                                                               
        sortBy : name, (float64)salary/(float64)age;                                      
        order  : ascending, descending;                                                   
    }                                                                                     
    // punctuation based window                                                           
    stream<Beat> Sorted4 = Sort(Beat)                                                     
    {                                                                                     
      window                                                                              
        Beat : tumbling, punct();                                                         
      param                                                                               
        sortBy : name, (float64)salary/(float64)age;                                      
    }                                                                                     
    // time based window                                                                  
    stream<Beat> Sorted5 = Sort(Beat)                                                     
    {                                                                                     
      window                                                                              
        Beat : tumbling, time(10);                                                        
      param                                                                               
        sortBy : name, (float64)salary/(float64)age;                                      
    }                                                                                     
    // delta based window                                                                 
    stream<uint32 id, uint32 age, uint64 salary> BeatId = Beacon() {}                     
    stream<BeatId> Sorted6 = Sort(BeatId)                                                 
    {                                                                                     
      window                                                                              
        BeatId : tumbling, delta(id, 10u);                                                
      param                                                                               
        sortBy : (float64)salary/(float64)age;                                            
    }                                                                                     
    // progressive sort                                                                   
    stream<Beat> Sorted = Sort(Beat)                                                      
    {                                                                                     
      window                                                                              
        Beat : sliding, count(10);                                                        
      param                                                                               
        sortBy : name, (float64)salary/(float64)age;                                      
    }                                                                                     
}

Summary

Ports
This operator has 1 input port and 1 output port.
Windowing
This operator requires a windowing configuration.
Parameters
This operator supports 3 parameters.

Required: sortBy

Optional: order, partitionBy

Metrics
This operator reports 1 metric.

Properties

Implementation
C++
Threading
WindowEvictionBound - Operator provides a single threaded execution context only if a time-based window eviction policy is not used.

Input Ports

Ports (0)

The Sort operator is configurable with a single input port, which ingests tuples to be sorted. The Sort operator processes window punctuation markers when configured with a punctuation-based window.

Properties

Output Ports

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes. Attributes not assigned in the output clause will be automatically assigned from the attributes of the input ports that have the same name and type. If there is no such input attribute, an error is reported at compile-time.
Ports (0)

The Sort operator is configurable with a single output port, which produces sorted tuples. The Sort operator generates a punctuation after each batch of sorted tuples it outputs. The Sort operator requires that the stream type for the output port matches the stream type for the input port.

Properties

Parameters

This operator supports 3 parameters.

Required: sortBy

Optional: order, partitionBy

order

Specifies either the global sort order, or the sort order for the individual expressions that appear in the sortBy parameter. The valid values are ascending and descending. When a single value is specified for the order parameter, it determines the global sort order. When multiple values are specified, then the number of values must match the number of sortBy expressions.

Properties

partitionBy

Specifies one or more expressions to be used for partitioning the input tuples into subwindows, where all window and parameter configurations apply to the subwindows, independently. This parameter is only valid for a Sort operator that is configured with a partitioned window.

Properties

sortBy

Specifies one or more expressions to be used for sorting the tuples. The sort is performed in lexicographical manner in ascending order. That is, the first expression is used first for the comparison and in the case of equality the second expression is considered, and so on. The default sort order of ascending implies that the output stream produces tuples in non-decreasing order. The sort order can be changed by using the order parameter.

Properties

Code Templates

Sort
stream<${schema}> ${streamName} = Sort(${inputStream}) {
            window
                ${inputStream}: ${windowMode};
            param
                sortBy : ${sortExpression}
        }
      

Metrics

nCurrentPartitions - Gauge

Returns the number of partitions in the current window.