IBM Streams 4.2.1

Operator Custom

Primitive operator image not displayed. Problem loading file: ../../image/tk$spl/op$spl.utility$Custom.svg

The Custom operator is a special logic-related operator that can receive and send any number of streams and does not do anything by itself. Thus, it offers a blank slate for customization.

The Custom operator can submit tuples from within its onTuple, onPunct, and onProcess clauses. It requires special support from the compiler. The submit intrinsic function is available only in the Custom operator, and has the following signatures:

<tuple T> void submit (T tupleValue, T port)                                      //1
<tuple T> void submit (enum{WindowMarker, FinalMarker} punctuation, T port )      //2
<tuple T> void submit (T tupleValue, uint32 port)                                 //3
<tuple T> void submit (enum{WindowMarker, FinalMarker} punctuation, uint32 port)  //4

It is preferable to use the first two versions of the submit function because the symbolic name of the output stream is used. This symbolic name allows the output stream order to be changed without needing to update calls to submit and the tuple type to be checked at compile time.

If the Custom operator has no input streams, then a logic onProcess : stmt clause is allowed. For example:
streams<int32 a, int32 b> A = Custom() {
     logic onProcess : {
          mutable int32 i = 0;
          for (int32 j in range (1000)) {
               int32 a = i++;
               int32 b = i++;
               submit ({a = a, b = b}, A);
          }
     }
}

If the Custom operator has one or more input streams, then the onTuple and onPunct clauses are allowed.

Checkpointed data

When the Custom operator is checkpointed, logic state variables (if present) are saved in checkpoint.

Behavior in a consistent region

The Custom operator can be an operator within the reachability graph of a consistent region. It cannot be the start of a consistent region. Custom operators with an 'onProcess' clause are not supported. The Streams instance automatically checkpoints and resets logic state variables.

Checkpointing behavior in an autonomous region

When the Custom operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its state from the last checkpoint.

When the Custom operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.

Such checkpointing behavior is subject to change in the future.

Examples

This example uses the Custom operator.

composite Main {                                                             
  graph                                                                      
    stream<rstring color, int32 intensity> Sensors = Beacon() {}             
    stream<rstring color> Queries = Beacon() {}                              
    stream<rstring key, int32 val> Output = Custom(Sensors; Queries) {       
      logic state          : mutable map<rstring, int32> m;                  
            onTuple Sensors: m[color] = intensity;                           
            onTuple Queries: if (color in m)                                 
                               submit({key=color, val=m[color]}, Output);    
            onPunct Queries: if (currentPunct() == Sys.WindowMarker)         
                               submit(Sys.WindowMarker, Output);             
    }                                                                        
}

Summary

Ports
This operator has 1 or more input ports and 1 or more output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator does not support parameters.
Metrics
This operator does not report any metrics.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Input Ports

Ports (0...)

The Custom operator can have zero or more input ports, which ingest input tuples.

Properties

Output Ports

Assignments
This operator does not allow assignments to output attributes.
Ports (0...)

The Custom operator can have zero or more output ports, which produce submitted tuples. If the tuple is referenced after a submit call, the compiler ensures that the tuple is unchanged by submitting a copy of the tuple.

Properties

Code Templates

Custom Sink
() as ${opInstanceName}Sink = Custom(${inputStream})  {
            logic
                onTuple ${inputStream}: {
                    // Add code here
                    ${cursor}
                }
        }
      

Custom with Input and Output
stream<${schema}> ${outputStream} = Custom(${inputStream})  {
            logic
                onTuple ${inputStream}: {
                    // Add code here
                    ${cursor}
                }
        }
      

Custom as a Source
stream<${schema}> ${outputStream} = Custom()  {
            logic
                onProcess : {
                    // Add code here
                    ${cursor}
                }
        }