IBM Streams 4.2

Operator invocations

The purpose of SPL is to allow users to create streams of data, which are connected and manipulated by operator invocations.

SPL programs are deployed on distributed hardware for scaling. The main goals of SPL are scalability (using distributed hardware), extensibility (encapsulating low-level code in high-level operators), and usability (easing the writing of scalable and extensible code).

A stream is an infinite sequence of tuples. As seen in the topic Types, a tuple is simply an instance of a tuple type, consisting of named attributes. Each stream is the result of an operator invocation. An operator invocation observes data from zero or more input streams and defines zero or more output streams. Every stream is defined by exactly one operator invocation, but can be consumed by any number of operator invocations. Each time a tuple arrives on any one of the input streams of an operator invocation, the operator fires, and can produce tuples on its output streams. An operator invocation in SPL returns streams, analogously to how a function invocation in a traditional language returns values. However, given that a stream is an infinite sequence of tuples, each operator invocation is active for the duration of the program execution, firing repeatedly once for each of its inputs tuples. You can define streams of tuples by invoking operators. The syntax for defining a stream, for example:
stream<int32 i> Strm1 = SomeOperator(Strm2)
is designed to resemble the syntax for defining a variable, for example:
list<int32> list1 = someFunction(list2)
Here is a typical example for an operator invocation, repeated from the introduction:
stream<rstring buyer , rstring seller, rstring item> Sale = Join (Bid ; Ask ) {
  window Bid  : sliding , time (30);
         Ask  : sliding , count (50);
  param match : Bid.item == Ask.item && Bid.price >= Ask.price;
  output Sale : item = Bid . item;
}

Each operator invocation has a head and a body. The head (Line 1 in the example) lists the connected streams and the operator that is used to process these streams. The body (Lines 2-6 in the example) elaborates on how the operator is to be invoked. The BNF syntax is:

opInvoke    ::= opInvokeHead opInvokeBody
SPL supports a wide variety of operators with the default toolkits included with the compiler, and furthermore, developers can extend SPL with new operators. Each operator can be customized and reused in different places in a data flow graph. To support this customization, SPL supports a versatile customization syntax. All the configuration happens in the operator invocation body to avoid tangling it with the data flow specification in the operator invocation head. The operator body can have five clauses, though some are usually omitted. SPL separates the body of operator invocations into clauses to make them easy to read. This example illustrates all five clauses.
composite SaleJoin {
  graph
    stream<rstring buyer, rstring item, decimal64 price>
      Bid = FileSource() { param file : "bids.txt"; }
    stream<rstring seller, rstring item, decimal64 price>
      Ask = FileSource() { param file : "asks.txt"; }
    stream<rstring buyer, rstring seller, rstring item, uint64 id>
      Sale = Join(Bid; Ask)
    {
      logic  state       : mutable uint64 n = 0;
             onTuple Bid : n++;
      window Bid         : sliding, time(30);
             Ask         : sliding, count(50);
      param  match       : Bid.item == Ask.item && Bid.price >= Ask.price;
      output Sale        : item = Bid.item, id = n;
      config wrapper     : gdb;
    }
}
Figure 1. Clause activation sequence example for Join operator
The sequence diagram shows the execution order for the Join operator. When a tuple arrives, the logic statement is executed first, followed by inserting and retrieving tuples from windows. After the interactions with windows are completed, the param match expressions are executed, and finally the operator submits a tuple when the match succeeds.
The five operator invocation body clauses are:
  • The logic clause consists of local state that persists over the whole program execution, and statements that execute when a tuple arrives on an input port (see Logic clause).
  • The window clause specifies how many previously received tuples of each port to remember for later processing by stateful operators such as Join, Sort, or Aggregate (see Window clause).
  • The param clause contains code snippets, such as expressions, supplied to the operator at invocation time; at run time, the operator executes them whenever needed for its behavior (see Param clause).
  • The output clause assigns values to attributes in output tuples each time the operator submits data to an output stream (see Output clause).
  • The config clause gives directives and hints that influence how the compiler builds this operator invocation, or how the runtime system executes it (see Config clause).

Clause order follows the typical execution order. Execution order is operator-specific, and the figure Figure 1 shows a simplified execution order for the Join operator, which is typical for other operators as well. When a tuple arrives, logic executes first, followed by interacting with the window, executing param expressions, and finally assigning output attributes when the operator submits a tuple. Since the user rarely needs to see configuration options to understand what the code does, they have their own clause at the end of the operator invocation body. The operator invocation body syntax is:

opInvokeBody   ::= ‘{'
                    ( ‘logic' opInvokeLogic+ )?
                    ( ‘window' opInvokeWindow+ )?
                    ( ‘param' opInvokeActual+ )?
                    ( ‘output' opInvokeOutput+ )?
                    ( ‘config' configuration+ )?
                   ‘}'
opInvokeLogic  ::= opInvokeCode | opInvokeState
opInvokeCode   ::= 'onProcess' ':' stmt
                   | ( 'onTuple' | 'onPunct' ) ':' stmt       # ID is input port
opInvokeState  ::= 'state' ':' ( varDef | '{' varDef+ '}' )
opInvokeWindow ::= ID ':' expr+, ';'                          # ID is input port
opInvokeActual ::= ID ':' opActual ';'                        # ID is param name
opInvokeOutput ::= ID ':' ( ID '=' expr )+, ';'               # ID is output port
configuration  ::= ID ':' expr+, ';'                          # ID is config name

The syntax of an operator invocation body vaguely resembles that of a switch statement in Java™, or a class with public/private sections in C++. The compiler, in addition to the syntactic constraints shown here, enforces semantic constraints on each clause. For example, only the windows that are described in the Window clause are legal in the window clause.