IBM Streams 4.2.1

Operator Barrier

Primitive operator image not displayed. Problem loading file: ../../image/tk$spl/op$spl.utility$Barrier.svg

The Barrier operator is used to synchronize tuples from two or more streams. Corresponding tuples from each input port are used to create an output tuple. The Barrier operator creates and sends this output tuple only when all the tuples from the input ports are received.

Checkpointed data

When the Barrier operator is checkpointed, any unmatched tuples are saved in checkpoint.

Behavior in a consistent region

The Barrier operator can be an operator within the reachability graph of a consistent region. It cannot start a consistent region.

When in a consistent region, the Barrier is typically used to synchronize the results from performing parallel tasks on a stream in the region. Thus it is effectively stateless, as any single upstream tuple is fully processed when all of its parallel derived tuples have arrived at the Barrier and the resultant Barrier output tuple is processed by the downstream portion of the region. Thus no unmatched tuples are checkpointed and a reset of the region leaves the Barrier empty, with no unmatched tuples, ready for a new set of derived tuples from parallel processing.

In the case where the Barrier operator is being used to synchronize logically unrelated streams, then when the region is made consistent, any unmatched tuples are checkpointed. If the region is reset, then the Barrier operator discards all its unmatched tuples and is repopulated with the unmatched tuples from the last time the region was consistent.

Checkpointing behavior in an autonomous region

When the Barrier operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its state from the last checkpoint.

When the Barrier operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.

Such checkpointing behavior is subject to change in the future.

Examples

This example uses the Barrier operator.

composite Main {                                                                                  
  graph                                                                                           
    stream<rstring name, rstring gender, uint32 age> BeatA = Beacon() {param period: 1f;}  
    stream<rstring name, rstring gender, rstring job> BeatB = Beacon() {param period: 1f;}        
    stream<rstring name, rstring gender, uint64 salary> BeatC = Beacon() {}                       

    // with no buffer size, used for parallel tasks                                               
    stream<rstring name, rstring gender, uint32 age> OpA = Functor(BeatA) {}                      
    stream<rstring name, rstring gender, rstring job> OpB = Functor(BeatB) {}                     
    stream<rstring name, rstring gender, uint32 age, rstring job>                                
        Res1 = Barrier(OpA; OpB) {}                                                               

    // with buffer size, used for synchronizing independent sources                               
    stream<rstring name> OtherBeat = Beacon() {}                                                  
    stream<rstring name1, rstring name2, rstring gender>                                          
         Res2 = Barrier(BeatA; OtherBeat)                                                          
    {                                                                                             
      param bufferSize : 1000u;                                                                   
      output Res2 : name1 = BeatA.name, name2 = OtherBeat.name;                                   
    }                                                                                             

    // with partitioning                                                                          
    stream<rstring name, rstring gender, uint32 age, rstring job>                                
        Res3 = Barrier(OpA; OpB)                                                                  
     {                                                                                            
      param partitionByLHS : OpA.name;                                                            
            partitionByRHS : OpB.name;                                                            
    }                                                                                             

    // with partitioning and more than two ports                                                  
    stream<rstring name, rstring gender, uint64 salary> OpC = Functor(BeatC) {}                   
    stream<rstring name, rstring gender, uint32 age, rstring job, uint64 salary>                  
        Res4 = Barrier(OpA; OpB; OpC)                                                             
    {                                                                                             
      param partitionBy0 : OpA.name, OpA.gender;                                                  
            partitionBy1  : OpB.name, OpB.gender;                                                 
            partitionBy2  : OpC.name, OpC.gender;                                                 
    }                                                                                             
}

Summary

Ports
This operator has 3 or more input ports and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports arbitrary parameters in addition to 3 specific parameters.

Optional: bufferSize, partitionByLHS, partitionByRHS

Metrics
This operator does not report any metrics.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Input Ports

Ports (0...1)

The Barrier operator is configurable with two or more input ports, which ingest tuples to be synchronized. The Barrier operator does not support custom port logic to be specified in its invocations.

Properties

Ports (2...)

Additional ports that ingest tuples to be synchronized.

Properties

Output Ports

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes. Attributes not assigned in the output clause will be automatically assigned from the attributes of the input ports that have the same name and type. If there is no such input attribute, an error is reported at compile-time.
Ports (0)

The Barrier operator is configurable with a single output port, which produces the synchronized stream. The Barrier operator allows assignments to output attributes. The output tuple attributes whose assignments are not specified are automatically forwarded from the input ones. After the automatic forwarding, the Barrier operator expects all output tuple attributes to be completely assigned.

Properties

Parameters

The optional partitionBy<i> parameter, where i is a port number, specifies one or more expressions to be used for partitioning the input tuples from the port at index i, where the synchronization applies to matching partitions from different ports. When this parameter is specified, you must specify one partitionBy parameter for each input port.

The partitionByLHS parameter is a synonym for the partitionBy0 parameter. The partitionByRHS parameter is a synonym for a partitionBy<n> parameter, where n+1 is the index of the last input port.

All partitionBy parameters must have the same number of expressions and the types of the expressions at corresponding positions across different parameters must have matching types as well.

Optional: bufferSize, partitionByLHS, partitionByRHS

bufferSize

Specifies an expression that is used for limiting the size of the internal buffer that is used to queue up tuples from an input port that do not yet have matching tuples from other ports. When the buffer fills up, it results in blocking until more space is available. By default, the buffer has no limit. The default option is often used to synchronize the results from performing parallel tasks on the same stream.

This parameter is often used when the operator is used to synchronize streams that originate from different sources with potentially differing rates. However, this kind of usage implies that the operator is being used to synchronize logically unrelated streams. Such usage is discouraged.

This parameter is not supported when the operator is in a consistent region.

Properties

partitionByLHS

Specifies expressions to be used for partitioning the input tuples from the left port, where the synchronization applies to matching partitions from different ports.

When specified, one partitionBy parameter for each input port must appear. All partitionBy parameters must have the same number of expressions and the types of the expressions at corresponding positions across different parameters must have matching types as well.

Properties

partitionByRHS

Specifies expressions to be used for partitioning the input tuples from the right port, where the synchronization applies to matching partitions from different ports.

When specified, one partitionBy parameter for each input port must appear. All partitionBy parameters must have the same number of expressions and the types of the expressions at corresponding positions across different parameters must have matching types as well.

Properties

Code Templates

Barrier
stream<${schema}> ${outputStream} = Barrier(${inputStream1};${inputStream2}) {
            param
                ${parameter}:${parameterExpression};
            output
                ${outputStream}: ${outputExpression};
        }