Operator Delay

Primitive operator image not displayed. Problem loading file: ../../image/tk$spl/op$spl.utility$Delay.svg

The Delay operator is used to delay a stream by an amount while it keeps the inter-arrival times of tuples and punctuation intact.

Checkpointed data

When the Delay operator is checkpointed, logic state variables (if present) are saved in checkpoint.

Behavior in a consistent region

The Delay operator can be an operator within the reachability graph of a consistent region. It cannot be the start of a consistent region. When drain processing, the Delay operator waits until all delayed tuples are submitted. When reset processing, the operator discards all delayed tuples. Logic state variables (if present) are also automatically checkpointed and restored.

Note:
  • It is recommended for the Delay operator to have a small bufferSize
as to not delay the completion of a drain for a significant time.
  • It is recommended for the Delay operator to be run by the same thread
(that is to say, fused and without threaded ports) as the start operator of the region to reduce the number of pending input tuples to be processed during a drain and reset.

Checkpointing behavior in an autonomous region

When the Delay operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its state from the last checkpoint.

When the Delay operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.

Such checkpointing behavior is subject to change in the future.

Examples

This example uses the Delay operator.


composite Main {                                                 
  graph                                                          
    stream<rstring name, uint32 age> Beat = Beacon() {}    
    // with delay only                                           
    stream<rstring name, uint32 age> Delayed1 = Delay(Beat)
    {                                                            
      param delay : 2.5;                                         
    }                                                            
    // with delay and buffer size                                
    stream<rstring name, uint32 age> Delayed2 = Delay(Beat)
    {                                                            
      param delay      : 2.5;                                    
            bufferSize : 250u;                                   
    }                                                            
}               

Summary

Ports
This operator has 1 input port and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 2 parameters.

Required: delay

Optional: bufferSize

Metrics
This operator does not report any metrics.

Properties

Implementation
C++
Threading
Never - Operator never provides a single threaded execution context.

Input Ports

Ports (0)

The Delay operator is configurable with a single input port, which ingests tuples to be delayed.

Properties

Output Ports

Assignments
This operator does not allow assignments to output attributes.
Ports (0)

The Delay operator is configurable with a single output port, which produces delayed tuples. This output port schema must match the input port schema.

Properties

Parameters

This operator supports 2 parameters.

Required: delay

Optional: bufferSize

bufferSize

Specifies the maximum number of tuples and punctuation that might be buffered by the operator. Its default value is 1000. When the buffer is full, the incoming tuples and punctuation are blocked until there is space in the buffer. The final punctuation marker is not forwarded until the buffer is drained.

Properties

delay

Specifies the delay to be introduced in seconds.

Properties

Code Templates

Delay

stream<${schema}> ${outputStream} = Delay(${inputStream})  {
            param
                delay: ${delayInSeconds};
        }