IBM Streams 4.2.1

Nested parallel regions

Nested user-defined parallelism (nested UDP) allows for parallel regions to contain other parallel regions in your IBM® Streams applications.

The ability of parallel regions to be nested allows toolkit developers to incorporate user-defined parallelism (UDP) into their operators while still allowing those operators to be incorporated into parallel regions themselves. To use nested parallelism, add the @parallel annotation to either a primitive or a composite operator that itself is inside of a composite operator invoked with the @parallel annotation. For example:
    composite Inner(input In; output C) {
        graph
            stream<int32 i> A = Functor(In) {}
            stream<int32 i> B = Functor(A) {}
            stream<int32 i> C = Functor(B) {}
    }

    composite Outer {
        graph
            stream<int32 i> Src = Beacon() {}
            
            @parallel(width=3)
            stream<int32 i> Op = Inner(Src) {}

            stream<int32 i> Sink = Custom(Op) {}
    }

    composite Main {
        graph
            @parallel(width=2)
            () as App = Outer() {}
    }

The example of nested parallelism does not require understanding any new concepts that are not already present in parallel regions. However, when you need to refer to particular parallel channels in nested parallel regions, there are several new concepts.

First are the concepts of local channels and global channels. In the previous example there is a parallel region of width 3 nested within a parallel region of width 2. The nested (inner) parallel region, which by itself has three channels, is replicated twice when the outer parallel region is transformed, resulting in six actual channels within the inner parallel region. In order that the inner channels can be uniquely identified, they are given an index in the range of 0—5, inclusive, and this index is referred to as the global channel index. The local channel index does not consider any nesting and so, for the inner parallel region, the local channel index would be in the range of 0—2, inclusive. It follows that, for non-nested parallel regions, the local and global indexes are the same.

Second is the concept of the closest enclosing parallel region. For any given operator in a parallel region, the closest enclosing parallel region would be the parallel region in which the operator is most immediately located.

You can use the following functions to control operator behavior within parallel regions:
int32 getChannel()
Returns an int32 which is the global index of the parallel channel.
Note: This function returns -1 if the calling operator is not located in a parallel region.
int32 getMaxChannels()
Returns an int32 which is the global maximum number of parallel channels.
Note: This function returns 0 if the calling operator is not located in a parallel region.
int32 getLocalChannel()
Returns an int32 which is the local index of the parallel channel in the closest enclosing parallel region.
Note: This function returns -1 if the calling operator is not located in a parallel region.
int32 getLocalMaxChannels()
Returns an int32 which is the local number of parallel channels. This is the number of local channels in the closest enclosing parallel region.
Note: This function returns 0 if the calling operator is not located in a parallel region.
list<int32> getAllChannels()
Returns a list of int32 values, where each element of the list is the global channel index that this operator is in. The 0th element of the list is the global channel index within the closest enclosing parallel region, which means that:
 getAllChannels()[0] == getChannel()
Note: This function returns an empty list if the calling operator is not located in a parallel region.
list<int32> getAllMaxChannels()
Returns a list of int32 values, where each element of the list is the global maximum number of parallel channels for the enclosing parallel regions. The 0th element is the global index for the closest enclosing parallel region, which means that:
 getAllMaxChannels()[0] == getMaxChannels()
Note: This function returns an empty list if it is not located in a parallel region.
In addition, you can use two intrinsic functions that impact the fusion of operators in a parallel region. You can only use these functions within the partitionColocation, partitionExlocation, hostColocation and hostExlocation placement configs.
rstring byChannel()
Indicates that operators within a channel are to be fused together. This function computes a string from the global channel number and the name of the operator on which the placement config is placed. Consider the following example:
    type Data = int32 i;

    composite Par2(input I; output F1) {
        graph
            stream<Data> F1 = Functor(I) {}
    }

    composite Par1(input I; output P2) {
        graph
            stream<Data> P1 = Functor(I) {}

            @parallel(width=3)
            stream<Data> P2 = Par2(P1) {}
    }

    composite Main {
        graph
            stream<Data> Beat = Beacon() { }

            @parallel(width=2)
            stream<Data> P = Par1(Beat) {
                config placement: partitionColocation(byChannel());
            }

            () as Out = FileSink(P) {
                param file: "out.dat";
            }
    }
Due to the way configs propagate, all the operators that are part of the subgraph defined by the invocation of Par1 will see a partitionColocation with the string “P_” + (string)getChannel(). This ensures that one partition would contain all the operators from global channel 0 of the outer parallel region and global channels 0—2 of the inner parallel region and another partition would contain all the operators from global channel 1 of the outer parallel region and global channels 3—5 from the inner parallel region.
rstring byReplica()
Indicates that replicated operators are fused together across channels. This function computes a string from the name of the operator on which the placement config is placed. Consider the following example:
    type Data = int32 i;

    composite Par2(input I; output F1) {
        graph
            stream<Data> F1 = Functor(I) {}
    }

    composite Par1(input I; output P2) {
        graph
            stream<Data> P1 = Functor(I) {}

            @parallel(width=3)
            stream<Data> P2 = Par2(P1) { 
                config placement: partitionColocation(byReplica());
            }
    }

    composite Main {
        graph
            stream<Data> Beat = Beacon() {}

            @parallel(width=2)
            stream<Data> P = Par1(Beat) {}

            () as Out = FileSink(P) {
                param file: "out.dat";
            }
    }
In this example, all the operators defined by the subgraph defined by the invocation of Par2 would be in the same partition.

The @parallel annotation also has an option for host pool replication. The host pools specified in the annotation are replicated once for each channel in the region (leaving the original pool unused). When replicated, their tags are extended with the pattern _i where i is a channel number. For example, if the tag "foo" is a replicated host tag, and it appears in a parallel region of size 3, the tags for each channel will become "foo_0", "foo_1" and "foo_2".

For nested parallel regions this scheme is extended simply by appending the list of global channels, from closest to furtherst. For example, consider an extension of the first example:
    composite Inner(input In; output C) {
        graph
            stream<int32 i> A = Functor(In) {
                config placement: host(innerHosts); 
            }
            stream<int32 i> B = Functor(A) {
                config placement: host(innerHosts); 
            }
            stream<int32 i> C = Functor(B) {
                config placement: host(innerHosts); 
            }
    }

    composite Outer {
        graph
            stream<int32 i> Src = Beacon() {}
            
            @parallel(width=3, replicateHostTags=["innerHosts"])
            stream<int32 i> Op = Inner(Src) {}

            stream<int32 i> Sink = Custom(Op) {}
    }

    composite Main {
        graph
            @parallel(width=2)
            () as App = Outer() {}

        config hostPool: innerHosts = createPool({tags=["innerHosts"], size=1u}, Sys.Shared);
    }

In the example, the generated host tags will have the form "innerHosts_i_j" where i is either 0, 1 or 2 (for the inner width of 3), and j is 0 or 1 (for the outer width of 2).