Nested parallel regions
Nested user-defined parallelism (nested UDP) allows for parallel regions to contain other parallel regions in your IBM® Streams applications.
composite Inner(input In; output C) {
graph
stream<int32 i> A = Functor(In) {}
stream<int32 i> B = Functor(A) {}
stream<int32 i> C = Functor(B) {}
}
composite Outer {
graph
stream<int32 i> Src = Beacon() {}
@parallel(width=3)
stream<int32 i> Op = Inner(Src) {}
stream<int32 i> Sink = Custom(Op) {}
}
composite Main {
graph
@parallel(width=2)
() as App = Outer() {}
}
The example of nested parallelism does not require understanding any new concepts that are not already present in parallel regions. However, when you need to refer to particular parallel channels in nested parallel regions, there are several new concepts.
First are the concepts of local channels and global channels. In the previous example there is a parallel region of width 3 nested within a parallel region of width 2. The nested (inner) parallel region, which by itself has three channels, is replicated twice when the outer parallel region is transformed, resulting in six actual channels within the inner parallel region. In order that the inner channels can be uniquely identified, they are given an index in the range of 0—5, inclusive, and this index is referred to as the global channel index. The local channel index does not consider any nesting and so, for the inner parallel region, the local channel index would be in the range of 0—2, inclusive. It follows that, for non-nested parallel regions, the local and global indexes are the same.
Second is the concept of the closest enclosing parallel region. For any given operator in a parallel region, the closest enclosing parallel region would be the parallel region in which the operator is most immediately located.
- int32 getChannel()
- Returns an int32 which is the global index of the parallel channel. Note: This function returns -1 if the calling operator is not located in a parallel region.
- int32 getMaxChannels()
- Returns an int32 which is the global maximum number of parallel channels. Note: This function returns 0 if the calling operator is not located in a parallel region.
- int32 getLocalChannel()
- Returns an int32 which is the local index of the parallel channel in the closest enclosing
parallel region. Note: This function returns -1 if the calling operator is not located in a parallel region.
- int32 getLocalMaxChannels()
- Returns an int32 which is the local number of parallel channels. This is the number of local
channels in the closest enclosing parallel region. Note: This function returns 0 if the calling operator is not located in a parallel region.
- list<int32> getAllChannels()
- Returns a list of int32 values, where each element of the list is the global channel index that
this operator is in. The 0th element of the list is the global channel index within the closest
enclosing parallel region, which means that:
getAllChannels()[0] == getChannel()Note: This function returns an empty list if the calling operator is not located in a parallel region. - list<int32> getAllMaxChannels()
- Returns a list of int32 values, where each element of the list is the global maximum number of
parallel channels for the enclosing parallel regions. The 0th element is the global index for the
closest enclosing parallel region, which means that:
getAllMaxChannels()[0] == getMaxChannels()Note: This function returns an empty list if it is not located in a parallel region.
- rstring byChannel()
- Indicates that operators within a channel are to be fused together. This function computes a
string from the global channel number and the name of the operator on which the placement config is
placed. Consider the following
example:
Due to the way configs propagate, all the operators that are part of the subgraph defined by the invocation of Par1 will see a partitionColocation with the string “P_” + (string)getChannel(). This ensures that one partition would contain all the operators from global channel 0 of the outer parallel region and global channels 0—2 of the inner parallel region and another partition would contain all the operators from global channel 1 of the outer parallel region and global channels 3—5 from the inner parallel region.type Data = int32 i; composite Par2(input I; output F1) { graph stream<Data> F1 = Functor(I) {} } composite Par1(input I; output P2) { graph stream<Data> P1 = Functor(I) {} @parallel(width=3) stream<Data> P2 = Par2(P1) {} } composite Main { graph stream<Data> Beat = Beacon() { } @parallel(width=2) stream<Data> P = Par1(Beat) { config placement: partitionColocation(byChannel()); } () as Out = FileSink(P) { param file: "out.dat"; } } - rstring byReplica()
- Indicates that replicated operators are fused together across channels. This function computes a
string from the name of the operator on which the placement config is placed. Consider the following
example:
In this example, all the operators defined by the subgraph defined by the invocation of Par2 would be in the same partition.type Data = int32 i; composite Par2(input I; output F1) { graph stream<Data> F1 = Functor(I) {} } composite Par1(input I; output P2) { graph stream<Data> P1 = Functor(I) {} @parallel(width=3) stream<Data> P2 = Par2(P1) { config placement: partitionColocation(byReplica()); } } composite Main { graph stream<Data> Beat = Beacon() {} @parallel(width=2) stream<Data> P = Par1(Beat) {} () as Out = FileSink(P) { param file: "out.dat"; } }
The @parallel annotation also has an option for host pool replication. The host pools specified in the annotation are replicated once for each channel in the region (leaving the original pool unused). When replicated, their tags are extended with the pattern _i where i is a channel number. For example, if the tag "foo" is a replicated host tag, and it appears in a parallel region of size 3, the tags for each channel will become "foo_0", "foo_1" and "foo_2".
composite Inner(input In; output C) {
graph
stream<int32 i> A = Functor(In) {
config placement: host(innerHosts);
}
stream<int32 i> B = Functor(A) {
config placement: host(innerHosts);
}
stream<int32 i> C = Functor(B) {
config placement: host(innerHosts);
}
}
composite Outer {
graph
stream<int32 i> Src = Beacon() {}
@parallel(width=3, replicateHostTags=["innerHosts"])
stream<int32 i> Op = Inner(Src) {}
stream<int32 i> Sink = Custom(Op) {}
}
composite Main {
graph
@parallel(width=2)
() as App = Outer() {}
config hostPool: innerHosts = createPool({tags=["innerHosts"], size=1u}, Sys.Shared);
}
In the example, the generated host tags will have the form "innerHosts_i_j" where i is either 0, 1 or 2 (for the inner width of 3), and j is 0 or 1 (for the outer width of 2).