Primitive operators
SPL also has primitive operators, which encapsulate code in a native language.
Recall that an operator is a reusable stream transformer, and a composite operator encapsulates a stream graph. If all operators were composite, there would be a chicken-and-egg problem; therefore, SPL also has primitive operators, which encapsulate code in a native language. The language is usually a more traditional, von Neumann language such as Java™ or C++. This section, describes a primitive operator RoundRobinSplit in C++, but IBM® Streams also provides the ability for you to write primitive operators in Java. If you are not a C++ programmer, or if you anticipate that you will mostly use operators from the SPL standard toolkit or other toolkits, you can skip this information. First, here is an example of invoking RoundRobinSplit from SPL code in the following stream graph:

use my.util::RoundRobinSplit;
composite Main {
graph
stream<int32 count> Input = Beacon() {
logic state : mutable int32 n = 0;
param iterations : 10u;
output Input : count = n++;
}
(stream<int32 count> A0; stream<int32 count> A1) = RoundRobinSplit(Input) {
param batch : 2u;
}
stream<int32 count, int32 path> B0 = Functor(A0) {
output B0 : path = 0;
}
stream<int32 count, int32 path> B1 = Functor(A1) {
output B1 : path = 1;
}
stream<int32 count, int32 path> Output = Pair(B0; B1) {}
() as Writer = FileSink(Output) {
param file : "/dev/stdout";
flush : 1u;
}
}
Line 9, (stream<int32 count> A0; stream<int32 count> A1) = RoundRobinSplit(Input), invokes operator RoundRobinSplit to produce two output streams A0 and A1. The operator takes a parameter param batch : 2u that indicates that it alternates after every two tuples. Line 18 invokes operator Pair on two input streams B0 and B1, with the code stream<int32 count, int32 path> Output = Pair(B0; B1). For now, put this code into a file RoundRobinSplit/Main.spl. However, do not try to compile it yet; first you must implement the operator RoundRobinSplit.
#pragma SPL_NON_GENERIC_OPERATOR_HEADER_PROLOGUE
class MY_OPERATOR : public MY_BASE_OPERATOR {
public:
MY_OPERATOR();
void process(Tuple & tuple, uint32_t port);
private:
Mutex _mutex;
uint32_t _count;
};
#pragma SPL_NON_GENERIC_OPERATOR_HEADER_EPILOGUE
#pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_PROLOGUE
MY_OPERATOR::MY_OPERATOR() : _count(0) {}
void MY_OPERATOR::process(Tuple & tuple, uint32_t port) {
uint32_t const nOutputs = getNumberOfOutputPorts();
uint32_t const batchSize = getParameter("batch");
AutoPortMutex apm(_mutex, *this);
uint32 outputPort = (_count / batchSize) % nOutputs;
_count = (_count + 1) % (batchSize * nOutputs);
assert(outputPort < nOutputs);
submit(tuple, outputPort);
}
#pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_EPILOGUE
The constructor just initializes the _count instance variable to zero. The process method queries the runtime APIs for the number of output ports (Line 4) and the batch size parameter (Line 5); acquires the mutex to guard against concurrent manipulation of the _count instance variable (Line 6); determines the output port (Line 7), updates _count (Line 8), and submits the input tuple to the appropriate output port (Line 10). The mutex is necessary because without it, if there are two threads T1 and T2 , then T1 's invocation of process might be interrupted in the middle of Line 8, after it reads the old value of _count but before it writes the new value; then T2 might call process and update _count; and finally, T1 might resume and overwrite T2 's update to _count.
0,0
2,1
1,0
3,1
4,0
6,1
5,0
7,1
Each line shows the count and path attributes that are separated by a comma. Because the split uses a batch size of two but the join uses a batch size of one, the counts (left column) have a progression of 0,2,1,3,4,6,5,7 whereas the paths (right column) just alternate between 0,1,0,1,0,1,0,1. This output is deterministically repeatable, independent of the processing speed of the two paths.
my.util/RoundRobinSplit/RoundRobinSplit_cpp.cgt:10: error:
no matching function for call to ‘SPL::_Operator::A0::submit(SPL::uint32&, SPL::Tuple&)'
note: candidates are: virtual void SPL::Operator::submit(SPL::Tuple&, uint32_t)
note: virtual void SPL::Operator::submit(const SPL::Tuple&, uint32_t)
note: void SPL::Operator::submit(const SPL::Punctuation&, uint32_t)
This section barely scratched the surface of developing primitive operators in SPL. There is a rich API for generating specialized code for performance, and for compile-time error checking on things like the number and types of ports.