IBM Streams 4.2

Implementing operators

An operator is implemented as a C++ class that extends from the SPL runtime class SPL::Operator.

The important member functions that are commonly used for implementing the operator logic include the following functions:
  • Constructor and destructor
  • Port readiness notification: allPortsReady
  • Tuple processing: process(Tuple)
  • Punctuation processing: process(Punctuation)
  • Termination notification: prepareToShutdown
Figure 1 shows the order of invocation of the member functions:
Figure 1. Order of invocation
This figure is described in the surrounding text.

An operator instance starts its runtime lifecycle with a call to its constructor. The operator cannot receive or send tuples and punctuation until its ports are ready. When the operator ports are ready, the operator starts receiving calls to its tuple and punctuation processing functions and can submit tuples accordingly. The allPortsReady function is called as a notification to the operator that its ports are ready to receive and submit tuples. The operator's tuple and punctuation processing functions can be invoked before the allPortsReady notification is received. If a tuple submission is to be made outside the context of a tuple or punctuation processing function, then the allPortsReady notification must be received first. When the processing element (PE) that hosts the operator is being shut down, the operator receives a call to its prepareToShutdown function, which can take place while tuple and punctuation processing functions are active. Eventually, the destructor is called to free any resources that are allocated by the operator.

Guidelines for Submitting and Processing Tuples and Punctuation

  • All operators are constructed before any tuple/punctuation can be received.
  • You cannot submit tuples or punctuation from a constructor because it causes a C++ exception.
  • You can start submitting tuples and punctuation when the allPortsReady function is called.
  • You might receive tuples and punctuation before the allPortsReady function is called.
  • After a final punctuation is received on an input port, no more tuples or punctuation will be received from that input port
  • After a final punctuation is submitted on an output port, all subsequent tuples and punctuation that are submitted to that port will be silently discarded.

Processing Input Tuples

There are three process functions an operator can choose to implement to process tuples and punctuation
  • void process(Tuple const & tuple, uint32_t port) is the tuple processing function for input ports that are non-mutating. The tuple parameter represents the tuple that is received on the input port at index port. Port indexing is zero-based.
  • void process(Tuple & tuple, uint32_t port) is the tuple processing function for input ports that are mutating.
  • void process(Punctuation const & punct, uint32_t port) is the punctuation processing function for all input ports. The punct parameter represents the punctuation that is received on the input port at index port.

An input port is either mutating or non-mutating, as specified by the operator model. An input port that is declared as mutating in the operator model indicates that the operator developer must implement the process function that takes a non-const tuple reference for adding the logic for that port. It is allowed to modify the tuple as part of the implementation. An input port that is declared as non-mutating in the operator model indicates that the operator developer must implement the process function that takes a const tuple reference for adding the logic for that port. It is not allowed to modify the tuple as part of the implementation. It is common that an operator implements only one of the two tuple processing functions. However, in the general case, an operator can have both mutating and non-mutating input ports, which would require implementing both functions.

A tuple that is passed in as a parameter to a process function can be used during the lifetime of the process function call. Do not store a pointer or a reference to the tuple for use in a context other than the current process call. A tuple received from the process call can be safely submitted when the submit call is performed in the context of the process call. If the tuple is to be stored as part of the operator state and made available across process calls, then make a copy.

Stateful Operators and Concurrency

The process functions of different input ports can be called concurrently with each other by the runtime system due to multi-threaded execution. Furthermore, the process function for an input port can be called concurrently with itself if the port has fan-in (more than one stream that is connected to it). In general, if an operator maintains state that is accessed by the process functions associated with its input ports, then this state must be protected against concurrent accesses to ensure execution correctness. Having windows also counts as having state. SPL provides three utility classes to help create critical sections, namely: Mutex, AutoMutex, and AutoPortMutex. Mutex is a class that wraps the functionality of a pthread mutex. AutoMutex is a class that creates a critical section from a Mutex object. It locks the mutex when constructed and unlocks it when it goes out of scope (that is, when the object is destructed). AutoPortMutex is similar to AutoMutex, but it reduces to an untaken branch when the SPL run time knows that there cannot be concurrent calls to the process functions of the operator. Whether the input process functions of an operator are called concurrently or not depends on the operator fusion configuration that is employed, the properties of the upstream operators, the transport options, and the existence of threaded ports.As a result, at operator development time, it is not possible to tell whether the operator's input process functions are called concurrently or not. It is important that the developer always protects the state against concurrent accesses. The SPL run time makes sure that the locking and unlocking cost is not paid when no concurrency is involved. For this optimization to take place seamlessly, use the AutoPortMutex class. An example use is as follows:

class MY_OPERATOR ... {
  ...  
private:
  Mutex mutex_;
};

void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {
  ...
  { // mutex_.lock() is called upon entry if needed
    AutoPortMutex am(mutex_, *this);
    ... // access state
  } // mutex_.unlock() is called upon exit if needed
  ...
}

Source Operators and Operator Threads

So far, implementation of operator logic that is triggered by calls to process functions associated with the input ports is described. However, source operators do not have input ports. Furthermore, some operators might prefer to perform work that is not triggered by incoming tuples. For these purposes, SPL's operator API provides an additional process function that is not associated with an input port. It is referred to as the non-input process function.

void process(uint32_t idx) is a process function that runs in a thread of its own. The idx parameter is the thread index, local to the operator.

As implied by the index argument of the non-input process function, there can be more than one thread that runs it. These threads are created by calls to the uint32_t createThreads(uint32_t num) function. This function creates num number of threads. These threads are referred to as operator threads. An operator thread calls the non-input process function once, passing it its local thread index.

With this functionality, a source operator might be implemented as follows:

void MY_OPERATOR::allPortsReady() {
  createThreads(1);
}

void MY_OPERATOR::process(uint32_t) {
  while(!getPE().getShutdownRequested()) {
    ...
  }
}

In the example, the createThreads function is called from within the allPortsReady function. While it is not strictly necessary, it is good practice. If the non-input process function contains code that submits tuples (which is very common), delaying the creation of the thread ensures that no tuples are submitted before the connections to the operators are set up. It is not valid to submit tuples before allPortsReady is called. Another important point to note is the use of getPE().getShutdownRequested() to check for the shutdown status of the processing element that manages this operator. If the non-input process function does not return upon shutdown of the PE, the operator and the PE are terminated forcefully when necessary, which is undesirable as the shutdown processing specified by the operators is not run. See Blocking and shutdown handling for further details.

Additional operator threads can be used to create multi-threaded operators. If the operator logic involves submitting tuples from more than one thread, care must be taken in specifying the threading behavior of the operator in the operator model.

The SPL language run time considers the execution of a source operator complete when all threads of the operator complete their execution. Since source operators do not have tuple or punctuation processing functions, the completion of the operator threads marks the end of the execution of the operator. The language run time ensures that final punctuation is sent on all output ports upon completion of the execution of the source operator. For more information about final punctuation, see Punctuation processing ). The runtime keeps track only of threads that are created with the SPL runtime APIs. If the operator relies on other threads, such as those created by third-party libraries, then an SPL operator thread must be created to wait on the external threads. Otherwise, the run time prematurely sends final punctuation on the output ports.

Similarly, non-source operators are considered complete when all operator threads are terminated and final punctuation is received on all input ports and is processed in full. In the general case, final punctuation is forwarded from the input ports to the output ports, automatically by the SPL language run time. However, if the operator model turns off auto-forwarding of final punctuation, then the operator developer is responsible for submitting final punctuation. The run time does not submit final punctuation on the output ports of the completed non-source operator.

Submitting Tuples and Punctuation

Tuples and punctuation can be submitted to output ports of the operator through the submit functions. There are two submit functions for tuples:
  • void submit(Tuple const & tuple, uint32_t port): The tuple parameter represents the tuple to be submitted on the output port at index port, where the indexing is zero-based.
  • void submit(Tuple & tuple, uint32_t port) is similar to the first function, but the tuple passed in can be modified as a result of the submit call.

For output ports that are non-mutating, both submit functions are valid and are equivalent in functionality. For mutating ports, only the non-const version is valid. For both versions, passing a tuple whose concrete type is not the same as the tuple type of the port that is specified results in a runtime error.

An output port that is declared as mutating in the operator model indicates that the operator developer must use the submit function that takes a non-const tuple reference for submitting tuples on that port. Expect that the submit call modifies the tuple as part of its processing. An output port that is declared as non-mutating in the operator model indicates that the operator developer is guaranteed that the submit call does not modify the tuple as part of its processing. Thus it can use any one of the submit functions. The submit function that is used for a submit call is defined by the function resolution rules of the C++ language. Here are the different cases:

Table 1. Submit call validity
Port Mutability Tuple Call site C++ resolution Validity
Mutating Non-const, for example: Tuple & t =... submit(t, 0) Non-const submit valid
Mutating Const, for example: Tuple const & t =... submit(t, 0) Const submit invalid
Non-mutating Non-const, for example: Tuple & t =... submit(t, 0) Non-const submit valid
Non-mutating Const, for example: Tuple const & t =... submit(t, 0) Const submit valid

As seen from Table 1, you need to pay close attention when you submit a const tuple to a mutating port. It is invalid and the SPL language run time throws an exception, resulting in a runtime error. The SPL compiler forces a C++ compilation error when the operator has no non-mutating output ports, by shadowing the non-const submit function in the generated operator code. This action results in an unresolved submit call, rather than a runtime error later on.

For punctuation, there is a single submit function.

void submit(Punctuation const & punct, uint32_t port): The punct parameter represents the punctuation to be submitted on the output port at index port.

Port Mutability

The tuple mutation settings for input and output ports are defined in the operator model. To summarize, for input ports, setting the tupleMutationAllowed property to true means declaring your intent to modify the incoming tuples. For output ports, setting the tupleMutationAllowed property to true means declaring your permission to let others modify the submitted tuples. Mutability of ports do not restrict ways in which operators can be connected to each other, as the SPL language run time handles any incompatibilities by creating copies as needed. The port mutability settings do have an impact on performance in the presence of operator fusion. For instance, a mutating output port can pass a tuple to a mutating input port under fusion, without requiring any copies. Alternatively, in a fan-out configuration, a tuple that is submitted on an output port can be passed in to multiple non-mutating input ports, without creating any copies. The following figure provides a general guideline for setting port mutability.

Figure 2. Setting port mutability
input port mutability
output port mutability

Accessing Parameters

For non-generic operators (operators using the pragma that was introduced earlier), parameters can be accessed in a type-safe manner using auto-generated convenience functions, created when the SPL_NON_GENERIC_OPERATOR_HEADER_PROLOGUE pragma is expanded during operator generation. These member functions are named as getParameter_<param-name> and return a value that has the same type as the parameter's first expression value that is specified in the application's SPL code. In other words:

<param-type> getParameter_<param-name>() const;

This helper function is generated only if the parameter's expressions are all attribute-free expressions (that is, does not involve stream tuples and attributes whose values are only known at run time). For information about accessing parameter values in a more generic way from within generic operators, see Implementing generic operators. As an example of parameter access for non-generic operators, consider the following SPL segment:

stream<...> MyStream = MyOperator(...) {
  param size : 10;
}

The value for the parameter size can be retrieved as follows:

int32 size = getParameter_size();
Three additional APIs are available to non-generic (and generic) operators for accessing parameter values in a more reflective manner.
// Return the set of all parameter names
const std::tr1::unordered_set<std::string>& getParameterNames() const

class ParameterValue {
  public:
    virtual bool isValue() = 0;
    virtual bool isExpression() = 0;
    virtual ConstValueHandle& getValue() = 0;
};

For each expression for a parameter, there is a corresponding ParameterValue associated with it. If the expressionMode for the parameter in the operator model is Expression or Attribute, then isExpression() returns true, and isValue() returns false. For all other expressionMode values, isValue() returns true and isExpression() returns false. If isValue() is true, then getValue() can be called to return the value of the expression.

If allowAny is true for the parameters in the operator model, and it is a parameter that is not explicitly listed, then the compiler examines each expression. If a stream attribute is present, the ParameterValue for that expression has isExpression() == true. If no stream attribute is referenced, isValue() returns true.

typedef std::vector<ParameterValue*> ParameterValueListType;
typedef std::tr1::unordered_map<std::string, ParameterValueListType> ParameterMapType;
	
// Return the vector of parameter values for a given parameter name
const  ParameterValueListType& getParameterValues(std::string const & param) const

// Return the map of parameter names to parameter values
virtual ParameterMapType& getParameters()

The getParameterNames function can be used to check whether a parameter is present. This check enables working with optional parameters, which are not possible with the non-reflective parameter access APIs. The getParameterValues function is used to access the values of a parameter. The parameter name is passed as a string argument. Multiple parameter expression values are available, and the type and value of each ParameterValue with isValue() == true can be extracted with ConstValueHandle member functions. If param is not a valid parameter name for this operator instance, SPLRuntimeInvalidArgumentException is thrown.

An example use is as follows:
int32 size = 0; // the default
if (getParameterNames().count("size"))  {
  Operator::ParameterValueListType& sizeValues = getParameterValues("size");
	assert (sizeValues[0]->isValue());
  size = sizeValues[0]->getValue();  // read the first expression value
}

The convenience functions hasParameter and getParameter are deprecated, replace them with these newer routines for accessing parameters.

For detailed information about ConstValueHandle and the reflective type system, see Using the reflective type system.