Package com.ibm.streams.flow.declare

Declaration of an SPL directed flow graph of Java primitive operators.

See: Description

Package com.ibm.streams.flow.declare Description

Declaration of an SPL directed flow graph of Java primitive operators.

Overview

OperatorGraph allows for the declaration of an arbitrary graph of Java primitive operators that follows SPL semantics. A Java primitive operator is represented by a reference to a class that implements Operator.

Testing Java Operators

The creation of the graph and the subsequent execution of it, supports the testing of SPL Java primitive operator code in a Java environment, by using testing frameworks such as JUnit.

A graph that can be tested is represented as a JavaTestableGraph and can contain disconnected input and output ports.
If ports of the graph are all connected then the test program will typically verifies the behavior by testing the affect of the graph on an external system, for example testing that a database sink operator produced the correct output in a database.
By using the JavaTestableGraph.getInputTester(InputPortDeclaration) method a test program can directly submit tuples to an operator. The returned StreamingOutput reference represents a test output port that is connected directly to the input port.
A test progam can use the disconnected output ports to verify operator or graph output by using a StreamHandler registered with JavaTestableGraph.registerStreamHandler(OutputPortDeclaration, StreamHandler).

Example of a Declared Graph

Here's a simple example of a graph that contains two operators, a com.ibm.streams.operator.samples.sources.RandomBeacon submitting tuples on a single output port to a filter operator that uses the com.ibm.streams.operator.samples.operators.Regex class.
 
   OperatorGraph graph = OperatorGraphFactory.newGraph();

   // Declare a beacon operator
   OperatorInvocation beacon = graph.addOperator(RandomBeacon.class);
   beacon.setIntParameter("iterations", 100);
   OutputPortDeclaration beaconOutput = beacon.addOutput("tuple");

   // Declare the filter operator
   OperatorInvocation filter = graph.addOperator(Regex.class);
   filter.setStringParameter("patterns",  "^[ABC].*", "^D.*");

   // Declare an input port connected to to the beacon's output
   InputPortDeclaration filterInput = filter.addInput(beaconOutput);

   // Declare the output port for the filter using the same
   // schema as its input
   OutputPortDeclaration filterOutput = filter.addOutput(filterInput.getStreamSchema());
 
 

Example using Fluent Interface Technique

Methods in this package usually return a reference that can be used to declare additional invocatation state. This Fluent Interface style then allows compact representation of a graph in Java code.
Example of connecting two operators
 
   OperatorGraph graph = OperatorGraphFactory.newGraph();

   // Declare a beacon operator
   OutputPortDeclaration beaconOutput =
         graph.addOperator(RandomBeacon.class)
              .setIntParameter("iterations", 100)
              .addOutput("tuple");

   // Declare the filter operator connected to to the beacon's output
   // and an output port of the same type.
   OutputPortDeclaration filterOutput =
         graph.addOperator(Regex.class)
               .setStringParameter("patterns",  "^[ABC].*", "^D.*")
               .addInput(beaconOutput)
               .operator()
               .addOutput(beaconOutput.getStreamSchema());
 
 
Example of declared a windowed input port
 
    OperatorGraph graph = OperatorGraphFactory.newGraph();

    OperatorInvocation op = graph.addOperator(TumblingSortOperator.class);

    // Here a window declaration is similar to an SPL window declaration
    op.addInput("tuple").tumbling().evictCount(200);
 
 

Limitations

  • Only Java primitive operators defined by Java classes that implement Operator can be included in the graph.
  • SPL dynamic connections are not supported.
  • The SPL logic clause is not supported.
  • The SPL configuration clause is not supported.
  • SPL annotations are not supported, therefore:
  • Checkpointing is not supported.
Since:
InfoSphere® Streams Version 3.1