IBM Streams 4.3.0

Nested parallel regions

Nested user-defined parallelism (nested UDP) allows for parallel regions to contain other parallel regions in your IBM® Streams applications. If you change the width of a nested parallel region at run time, the change applies to all replicas of that nested region.

The ability of parallel regions to be nested allows toolkit developers to incorporate user-defined parallelism (UDP) into their operators while still allowing those operators to be incorporated into parallel regions themselves. To use nested parallelism, add the @parallel annotation to either a primitive or a composite operator that itself is inside of a composite operator invoked with the @parallel annotation. For example:
    composite Inner(input In; output C) {
        graph
            stream<int32 i> A = Functor(In) {}
            stream<int32 i> B = Functor(A) {}
            stream<int32 i> C = Functor(B) {}
    }

    composite Outer {
        graph
            stream<int32 i> Src = Beacon() {}
            
            @parallel(width=3)
            stream<int32 i> Op = Inner(Src) {}

            stream<int32 i> Sink = Custom(Op) {}
    }

    composite Main {
        graph
            @parallel(width=2)
            () as App = Outer() {}
    }

The example of nested parallelism does not require understanding any new concepts that are not already present in parallel regions. However, when you need to refer to particular parallel channels in nested parallel regions, there are several new concepts.

First are the concepts of local channels and global channels. In the previous example there is a parallel region of width 3 nested within a parallel region of width 2. The nested (inner) parallel region, which by itself has three channels, is replicated twice when the outer parallel region is transformed, resulting in six actual channels within the inner parallel region. In order that the inner channels can be uniquely identified, they are given an index in the range of 0—5, inclusive, and this index is referred to as the global channel index. The local channel index does not consider any nesting and so, for the inner parallel region, the local channel index would be in the range of 0—2, inclusive. It follows that, for non-nested parallel regions, the local and global indexes are the same.

Global channel numbers are returned by the getChannel() function, and local channel numbers have a special function, getLocalChannel(). The global channel number, rather than the local channel number, is returned by the getChannel() function because the global number is more useful as a globally unique identifier. For example, a common pattern in parallel regions is for the operators to use their channel number to access an external resource. The global channel number enables this pattern.

As a consequence of the getChannel() function returning the global channel number, global channel numbers are also used in physical operator names. For example, the physical operator name, Outer[1].Middle[5].Inner[11] means that this operator is the twelfth replica of Inner across all parallel regions nests. It is inside the sixth replica of the Middle composite operator, which is inside of the second replica of the Outer composite operator.

Second is the concept of the closest enclosing parallel region. For any given operator in a parallel region, the closest enclosing parallel region would be the parallel region in which the operator is most immediately located.

Changing nested parallel regions at run time

You can change the parallel width at any level of a nested parallel region, but the change will apply to all parallel replicas at that level. You must use the logical name to refer to the parallel region, which means that you cannot specify individual replicas.

Physical operator names in nested parallel regions are based on the global channel numbers. For example, consider a nested parallel region where the composite operator Foo is replicated with a width of 4, which contains the composite operator Bar which is replicated with a width of 2, which contains the primitive operator Op which is replicated with the width of 3. The physical operator Foo[3].Bar[7].Op[23] is the physical operator with the highest global channel number across all nesting levels. This replica of Foo.Bar.Op is the 23rd in the entire parallel region, across all nesting levels. But when you change the parallel region width, you must use the logical name, Foo.Bar.Op:

streamtool updateoperators myjob --force --parallelRegionWidth Foo.Bar.Op=2

After this parallel region width change, the physical operator with the highest global channel number across all nesting levels is now Foo[3].Bar[7].Op[15]. Using the logical operator name (such as Foo.Bar.Op) to refer to nested parallel regions is not a special case, but is actually an instance of the general case of referring to nested composite operators.

Functions that control operator behavior within parallel regions

You can use the following functions to control operator behavior within parallel regions:
int32 getChannel()
Returns an int32 which is the global index of the parallel channel.
Note: This function returns -1 if the calling operator is not located in a parallel region.
int32 getMaxChannels()
Returns an int32 which is the global maximum number of parallel channels.
Note: This function returns 0 if the calling operator is not located in a parallel region.
int32 getLocalChannel()
Returns an int32 which is the local index of the parallel channel in the closest enclosing parallel region.
Note: This function returns -1 if the calling operator is not located in a parallel region.
int32 getLocalMaxChannels()
Returns an int32 which is the local number of parallel channels. This is the number of local channels in the closest enclosing parallel region.
Note: This function returns 0 if the calling operator is not located in a parallel region.
list<int32> getAllChannels()
Returns a list of int32 values, where each element of the list is the global channel index that this operator is in. The 0th element of the list is the global channel index within the closest enclosing parallel region, which means that:
 getAllChannels()[0] == getChannel()
Note: This function returns an empty list if the calling operator is not located in a parallel region.
list<int32> getAllMaxChannels()
Returns a list of int32 values, where each element of the list is the global maximum number of parallel channels for the enclosing parallel regions. The 0th element is the global index for the closest enclosing parallel region, which means that:
 getAllMaxChannels()[0] == getMaxChannels()
Note: This function returns an empty list if it is not located in a parallel region.
In addition, you can use two intrinsic functions that impact the fusion of operators in a parallel region. You can only use these functions within the partitionColocation, partitionExlocation, hostColocation and hostExlocation placement configs.
rstring byChannel()
Indicates that operators within a channel are to be fused together. This function computes a string from the global channel number and the name of the operator on which the placement config is placed. Consider the following example:
    type Data = int32 i;

    composite Par2(input I; output F1) {
        graph
            stream<Data> F1 = Functor(I) {}
    }

    composite Par1(input I; output P2) {
        graph
            stream<Data> P1 = Functor(I) {}

            @parallel(width=3)
            stream<Data> P2 = Par2(P1) {}
    }

    composite Main {
        graph
            stream<Data> Beat = Beacon() { }

            @parallel(width=2)
            stream<Data> P = Par1(Beat) {
                config placement: partitionColocation(byChannel());
            }

            () as Out = FileSink(P) {
                param file: "out.dat";
            }
    }
Due to the way configs propagate, all the operators that are part of the subgraph defined by the invocation of Par1 will see a partitionColocation with the string “P_” + (string)getChannel(). This ensures that one partition would contain all the operators from global channel 0 of the outer parallel region and global channels 0—2 of the inner parallel region and another partition would contain all the operators from global channel 1 of the outer parallel region and global channels 3—5 from the inner parallel region.
rstring byReplica()
Indicates that replicated operators are fused together across channels. This function computes a string from the name of the operator on which the placement config is placed. Consider the following example:
    type Data = int32 i;

    composite Par2(input I; output F1) {
        graph
            stream<Data> F1 = Functor(I) {}
    }

    composite Par1(input I; output P2) {
        graph
            stream<Data> P1 = Functor(I) {}

            @parallel(width=3)
            stream<Data> P2 = Par2(P1) { 
                config placement: partitionColocation(byReplica());
            }
    }

    composite Main {
        graph
            stream<Data> Beat = Beacon() {}

            @parallel(width=2)
            stream<Data> P = Par1(Beat) {}

            () as Out = FileSink(P) {
                param file: "out.dat";
            }
    }
In this example, all the operators defined by the subgraph defined by the invocation of Par2 would be in the same partition.

The @parallel annotation also has an option for host pool replication. The host pools specified in the annotation are replicated once for each channel in the region (leaving the original pool unused). When replicated, their tags are extended with the pattern _i where i is a channel number. For example, if the tag "foo" is a replicated host tag, and it appears in a parallel region of size 3, the tags for each channel will become "foo_0", "foo_1" and "foo_2".

For nested parallel regions this scheme is extended simply by appending the list of global channels, from closest to furtherst. For example, consider an extension of the first example:
    composite Inner(input In; output C) {
        graph
            stream<int32 i> A = Functor(In) {
                config placement: host(innerHosts); 
            }
            stream<int32 i> B = Functor(A) {
                config placement: host(innerHosts); 
            }
            stream<int32 i> C = Functor(B) {
                config placement: host(innerHosts); 
            }
    }

    composite Outer {
        graph
            stream<int32 i> Src = Beacon() {}
            
            @parallel(width=3, replicateHostTags=["innerHosts"])
            stream<int32 i> Op = Inner(Src) {}

            stream<int32 i> Sink = Custom(Op) {}
    }

    composite Main {
        graph
            @parallel(width=2)
            () as App = Outer() {}

        config hostPool: innerHosts = createPool({tags=["innerHosts"], size=1u}, Sys.Shared);
    }

In the example, the generated host tags will have the form "innerHosts_i_j" where i is either 0, 1 or 2 (for the inner width of 3), and j is 0 or 1 (for the outer width of 2).