IBM InfoSphere Streams Version 4.1.1

Host constraints in parallel regions

You can use both relative host constraints and host pool replication to configure host placement options for parallel channels.

By default, host placement options that you specify for operators in a parallel region apply to all channels in the region. To specify host constraints relative to the channels in a parallel region, use the getChannel() built-in function in the host placement expressions. The host placement configuration options include host, hostColocation, hostExlocation, and hostIsolation. The parallel transformation copies the host placement configuration options from the logical operators to the replicated physical operators. These configuration options constrain whether channels must (hostColocation) or must not (hostExlocation) run on the same host, or whether they run in a partition that has a host of their own (hostIsolation). The parallel transformation resolves the getChannel() function at submission time with the correct channel number for the replicated operator.

To place sibling operators from a parallel region on different sets of hosts, specify the replicateHostTags element in the parallel annotation. The replicateHostTags element uses host tags to determine whether to replicate any of the host pools that are used inside of a parallel region. When a host pool is replicated, it means that a new host pool is created for each channel, and its host tag is updated to indicate the channel that it is in. Each replicated host pool is used by only one channel in a parallel region but it can be used by multiple operators in the same channel. A host pool can also be used by multiple parallel regions, in which case each channel in each parallel region obtains a separate replica of the host pool.

The host placement configuration options hostColocation and hostExlocation can be used with the replicateHostTags element. For example, you can use the replicateHostTags element to ensure that all operators in a channel are on the same set of hosts, but are on a different set of hosts than operators in other channels. The hostColocation and hostExlocation options can then determine where operators are placed within that set of hosts.

Example 1: Combining two different relative host constraints
The objective of this example is to apply the host collocation constraint and the host exlocation constraint within the channel. To achieve this objective, you include the getChannel() SPL built-in function in the expression.
composite Foo2(input In) {
    graph
        stream<Type> A = Functor(In) {
            config placement: hostColocation("AB" + (rstring)getChannel()),
                              hostExlocation("C" + (rstring)getChannel());
        }
        stream<Type> B = Functor(A) {
            config placement: hostColocation("AB" + (rstring)getChannel());
        }
        stream<Type> C = Functor(B) {
            config placement: hostExlocation("C" + (rstring)getChannel());
        }
}

composite Main2() {
    graph
        stream<Type> Src = Source() {}
        @parallel(width=2)
        () as Snk = Foo2(Src) {}
}
The following graphic shows the semantics of the logical and physical views of this example application. By explicitly using the getChannel() SPL built-in function in the tag expression, you can create tags that are unique within a channel. In the graphic, the green lines represent the host collocation restraints, and the red lines represent the host exlocation constraints.

The logical and physical stream graphs for the host placement example of user-defined parallelism.

Example 2: Replicating host pools
The following example application has three different host pools ("inHosts", "compHost" and "outHosts"). The application uses the replicateHostTags option in the parallel annotation to specify how those host pools are replicated in the parallel region.
type Data = int32 id, rstring text;

composite ParRegion(output C) {
graph
  stream<Data> A = Beacon() {
    config placement: host(inHosts);     
  }         
  stream<Data> B = Functor(A) {
    config placement: host(compHosts);     
  }     
  stream<Data> C = Functor(B) {
    config placement: host(outHosts);     
  } 
} 
composite Main { 
graph     
  @parallel(width=5, replicateHostTags=["comp_channel", "out_channel"])  stream<Data> Res = ParRegion() {}
  () as Snk = FileSink(Done) {         
    param file: "results";     
  }      
config hostPool: 
  inHosts = createPool({tags=["in"], size=3}, Sys.Exclusive),
  compHosts = createPool({tags=["comp_channel"], size=3}, Sys.Exclusive),
  outHosts = createPool({tags=["out_channel"], size=3}, Sys.Shared); 
}

The composite ParRegion is replicated five times. The compHosts and outHosts host pools are replicated because their host tags match the tags that are specified in the replicateHostTags element of the parallel annotation. Five new host pools are created from compHosts, and five new host pools are created from outHosts. The replica of compHosts for channel 0 has the tag "comp_channel_0", the replica for channel 1 has the tag "comp_channel_1" and so on, for each channel. The host tags for the inHosts host pool do not match the values in replicateHostTags, so it is not replicated.

After the parallel transformation, all host specifications that referred to either compHosts or outHosts within the parallel region refer to the new host pool that has the appropriate tags for the channel.

Note: The original compHosts and outHosts host pools are not used in the parallel region. Only the replicas of those host pools are used. In this example, no other operator uses the original host pools, so InfoSphere® Streams does not assign any hosts to those pools. In general, other operators can reference these host pools, and if they do, InfoSphere Streams allocates hosts for them.