IBM Streams 4.2

Threading annotation

The SPL @threading annotation determines the threading model to use for the operators inside of a composite operator.

Read syntax diagramSkip visual syntax diagram
>>-@threading--(--model--="--threading-model--"--+-manual---------------+-><
                                                 +-automatic------------+   
                                                 +-dynamic--+-threads-+-+   
                                                 |          '-elastic-' |   
                                                 '-dedicated------------'   

The @threading annotation can be applied to a composite definition or invocation. This annotation can nest, and the inner-most annotation takes precedence. This nesting makes it possible to specify one threading model for most of the application, but switch to other threading models for subsets of the application.

The compile-time rules govern how conflicting @threading annotations apply to an operator:
  • The composite invocation takes precedence over the composite definition.
  • The most deeply nested annotation takes precedence.
The runtime rules govern how conflicting PE-wide rules from different operators apply to the PE:
  • All operators in a PE under the dynamic threading model will share the same thread pool, even if the operators' threading models were specified by different threading annotations.
  • Operators in a PE might have conflicting dynamic threading requests for the number of threads and elasticity. These conflicts are resolved by always favoring the higher number of requested threads, and favoring turning elasticity off.
  • The PE respects all preexisting threaded ports, including when dynamic and dedicated are turned on.
There are some threads in the PE that the threading model does not control. These threads are:
  • Operator threads. An operator thread is any thread started by an operator. Source operators always have their own thread, but non-source operators can also have their own threads.
  • PE input port threads. Each PE input port has its own thread which receives tuples from the network and submits those tuples downstream.
  • Threaded ports. Developers can explitictly add threads to their application through the threadedPort configuration option. Threaded ports execute the operator they are applied to, and all downstream operators until they reach an operator under a different thread.

Required elements

model
Specifies the threading model. The following options are supported:
manual
Each operator input port is executed by the thread that executed the operator upstream from it, unless the programmer manually adds a threaded port to an operator input port. The SPL runtime does not insert any threads. All threads come from operators, explicitly requested threaded ports, threaded ports injected by user-defined parallelism, or PE input ports. This threading model favors reducing individual tuple latency over throughput.
dynamic
Each operator input port can be executed by any thread. The assignment of threads to input ports can change at runtime, as can the number of available threads. By default, the dynamic threading model will start with the default number of threads, and periodically increase or decrease the number of threads to improve overall PE throughput. This threading model favors increasing overall throughput at the expense of individual tuple latency. The following options are supported:
threads
An integer value that specifies the number of threads to start with; values must be greather than or equal to 1. The default number of threads is one more than the maximum number of input ports for a single operator across all operators in the PE. This default ensures that there are enough threads to simultaneously execute all input ports for any given operator, and an additional thread to process other operators.
elastic
A boolean value which controls if elasticity is active (true) or inactive (false). If elasticity is active, then the SPL runtime will periodically evaluate if it should change the number of active threads in order to improve overall PE throughput. If elasticity is inactive, then the number of threads will not change during the lifetime of the PE. As a consequence, when elasticity is inactive, threads controls the number of threads for the lifetime of the PE. The default value is true, activating elasticity.
automatic
During PE initialization, the SPL runtime chooses between the dynamic and manual threading models. The SPL runtime uses both application information (such as the number of operators and number of preexisting threads in the PE) and system information (such as the number of logical processors on the host) to make the decision. While the SPL runtime tries to choose the best threading model for the PE and the system it is on, it is not guarenteed to do so. This option is the default.
dedicated
Each operator input port has a dedicated thread that processes all tuples on that input port for that operator. Note that under this threading model, the number of additional threads in a PE is entirely determined by the number of operator input ports in the PE. If the number of operator input ports is greather than the number of logical processors on the host, it is possible to over-subscribe the system. This threading model favors increasing overall throughput over individual tuple latency, but it is not adaptive.

Examples

Example 1: Setting the automatic annotation on the main composite
Applying the automatic threading model on the main composite definition means that the entire SPL application will execute under the automatic threading model. This is the default behavior. Each PE in the application will independently determine if it should use the manual or dynamic threading model.
    @threading(model=automatic)
    composite Main {
        graph
            stream<Data> Src = Beacon() {}
            stream<Data> A = Functor(Src) {}
            stream<Data> B = Functor(A) {}
            stream<Data> C = Functor(B) {}
            stream<Data> D = Functor(C) {}
            stream<Data> E = Functor(D) {}
            () as Sink = Custom(E) {}
    }
Example 2: Setting the dynamic annotation on the main composite
Applying the dynamic threading model on the main composite chooses the dynamic threading model for the entire SPL application. This example also explicitly sets the number of threads and turns off elasticity. Note that each PE in this application will have 8 threads and elasticity will be inactive. In general, explicitly setting the number of threads and turning off elasticity should only be done when the developer knows exactly how the application will be fused, and knows exactly what resources are available on the hosts each PE will run on.
    @threading(model=dynamic, threads=8, elastic=false)
    composite Main {
        graph
            stream<Data> Src = Beacon() {}
            stream<Data> Res = ManyOperators(Src) {}
            () as Sink = Custom(Res) {}
    }
Example 3: Overriding the model for a specific invocation

This example applies the dynamic threading model to the main composite definition, but then applies the manual threading model to a particular composite invocation. Most of the application will execute under the dynamic threading model, but the subset of the application in the Ingest composite will execute under the manual threading model.

The fusion and host placement constraints in the Ingest composite mean that all of the operators in it will be fused into the same PE, and will run on a particular host with special access to the outside network. The developer knows that the first-level parsing operator is resource-intensive, and that in order to improve throughput, it should be executed by a separate thread. In order to achieve this, the developer places a threaded port on that operator. The subsequent operators are light weight, and are run by the same thread as the heavy-weight parsing thread. Because the developer knows the performance characteristics of the operators in the Ingest composite, they can switch to the manual threading model and manually place threads where they know they should go.

    composite Ingest(output Out) {
        graph
            stream<Data> Src = SpecializedSource() {
                config placement: host(Ingest),
                                  partitionColocation("ingest");
            }
            stream<Data> Parsed = SpecializedHeavyParsing(Src) {
                config threadedPort: queue(Src, Sys.Wait);
                       placement: partitionColocation("ingest");
            }
            stream<Data> Filtered = Filter(Parsed) {
                param filter: Parsed.age > 13;
                config placement: partitionColocation("ingest");
            }
            stream<Data> Out = SpecializedLightParsing(Filtered) {
                config placement: partitionColocation("ingest");
            }
    }

    @threading(model=dynamic)
    composite Main {
        graph
            @threading(model=manual)
            stream<Data> In = Ingest() {}

            stream<Data> Processed = Processing(In) {}
            () as Sink = Egress(Processed) {}

        config hostPool: Ingest = createPool({size=1u, tags=["ingest"]}, Sys.Exclusive);
    }
Example 4: Overriding the threading model on a composite definition

This example is similar to Example 3, but with two key differences. First, the Ingress operator exclusively contains operators with heavy-weight processing. In this case, the developer wants to place threads inbetween each operator, and the dedicated threading model is most appropriate. Second, the threading model annotation goes on the operator definition, not its invocation. For these examples, it does not change the behavior. In general, however, applying a threading model to a composite definition means that threading model will apply to every invocation of that operator. Applying a threading model to just an operator's invocation applies that threading model to just that single invocation. Additionally, if that composite operator has a threading model on its definition, the threading model from the invocation will override it.

    @threading(model=dedicated)
    composite Ingest(output Out) {
        graph
            stream<Data> Src = SpecializedSource() {
                config placement: host(Ingest),
                       partitionColocation("ingest");
            }
            stream<Data> Pre = SpecializedHeavyPreParsing(Src) {
                config placement: partitionColocation("ingest");
            }
            stream<Data> Parsed = SpecializedHeavyParsing(Pre) {
                config placement: partitionColocation("ingest");
            }
            stream<Data> Out = SpecializedHeavyPostParsing(Parsed) {
                config placement: partitionColocation("ingest");
            }
    }

    @threading(model=dynamic)
    composite Main {
        graph
            stream<Data> In = Ingest() {}
            stream<Data> Processed = Processing(In) {}
            () as Sink = Egress(Processed) {}

        config hostPool: Ingest = createPool({size=1u, tags=["ingest"]}, Sys.Exclusive);
    }