IBM Streams 4.2

Parallel annotation

To take advantage of data-parallelism, apply the @parallel annotation to invocations of primitive or composite operators. To apply data-parallelism to an arbitrary group of operators, refactor those operators into a single composite operator, and then apply the @parallel annotation to the invocation of that composite operator.
Read syntax diagramSkip visual syntax diagram
>>-@parallel(width=number-| Optional elements |-)--------------><

Optional elements

|--+----------------------------------------------------------------------------+-->
   |                  .-,---------------------------------------------------.   |   
   |                  |                               .-,--------------.    |   |   
   |                  V                               V                |    |   |   
   '-,--partitionBy=[---{port=port_name,-attributes=[---attribute_name-+-]}-+-]-'   

>--+--------------------------------+--------------------------->
   |                .-,---------.   |   
   |                V           |   |   
   '-,--broadcast=[---port_name-+-]-'   

>--+---------------------------------------+--------------------|
   |                        .-,--------.   |   
   |                        V          |   |   
   '-,--replicateHostTags=[---host_tag-+-]-'   

Required elements

width
Specifies the number of channels in a parallel region. For each channel in a parallel region, IBM® Streams replicates the invoked operator. The width must be either an int32 literal, an expression<int32> composite parameter, or a submission-time value that is cast as an int32 data type.

Optional elements

partitionBy
Specifies how to partition tuples across the channels for specific ports that enter a parallel region. Ports are individually partitioned and each port can be different. There is one splitter per port. Partitioning a parallel region ensures that tuples with specific values are always routed to the same channel. These specific values are called partitioning keys, and are specified as tuple attributes. The partitioning specification for a parallel region is supplied as a list of port and attribute pairs. The pairs are specified as follows:
attributes
Specifies the partitioning keys as a list of attributes. The splitter routes tuples that have the same values for these attributes to the same parallel channel. The attributes must exist in the tuple type that is specified for the input port.
port
Specifies the name of the input port for a parallel region. The name must identify an input port on the operator invocation the parallel annotation applies to.
If a port is not partitioned, the splitter can route any tuple to any channel.
broadcast
Specifies that the tuples from the following list of ports should not be split, but should be broadcast to all channels. The port names must be input ports on the operator invocation the parallel annotation applies to.
replicateHostTags
Specifies host tags that identify the host pools to replicate in a parallel region. When a host pool is replicated, it is used only for a single channel in a parallel region. By replicating host pools, you can assign sibling operators in a parallel region to different sets of hosts.

Examples

Example 1: Parallel regions that are not partitioned
Example 1 shows the simplest form of a parallel annotation for parallel regions that are not partitioned. This example applies the @parallel annotation to the invocation of the Op operator. The annotation is placed immediately before the first output stream that is associated with the operator invocation. The newline is not required. The width parameter specifies that the operator is replicated five times and that there are five channels. Therefore, the output port of the input that feeds the Op operator uses a splitter. The partitionBy element is not specified, therefore the splitter routes the tuples to any channel.
    @parallel(width=5)
    stream<Type> Output = Op(Input) {
        // ...
    }
Example 2: Partitions in a parallel region
To partition a parallel region, you specify the partition keys for each input port. The partition keys are the tuple attributes that the splitter uses to create the partition. The valid attributes for the partitioning keys are from the stream type of the input port. In Example 2, the splitter to the Input1 port partitions the tuples based on the attr1 and attr2 attributes. The partitioning for the Input1 port is implemented with a hash, and the splitter routes tuples that are based on the hash value. The Input2 port does not specify a partitionBy element, therefore the splitter does not partition the parallel region and the tuples are evenly distributed among the parallel channels.
    @parallel(width=5, partitionBy=[{port=Input1, attributes=[attr1, attr2]}])
    stream<Type> Output = Op(Input1; Input2) {
        // ...
    }
Example 3: Partitioning for two streams that are coming into one port
Partitioning applies to all the streams that are incoming to the port. Example 3 shows two streams that are coming into one port. Notice that, the Input1 and Input2 streams are partitioned in the same way.
    @parallel(width=5, partitionBy=[{port=Input1, attributes=[attr1, attr2]}])
    stream<Type> Output = Op(Input1, Input2) {    
        // ...
    }
Example 4: Broadcasting tuples
Broadcasting tuples sends all tuples from a port to all channels. This option is useful if operators in all channels in a parallel region need to receive the same data. Broadcasting is mutually exclusive with partitioning; a port can be one or the other, but not both. In Example 4, the input port Updates contains tuples which appear infrequently, and represent periodic updates that all operators in all channels need to process. The port Input contains tuples which should be processed in parallel, partitioned on the attribute attr1.
    @parallel(width=7, partitionBy=[{port=Input, attributes=[attr1]}], broadcast=[Updates])
    stream<Type> Output = Op(Input; Updates) {    
        // ...
    }