Dynamic application composition
When a composite operator in the source code is intended as the main operator in a compiled application, its graph clause typically invokes operators for edge adaptation, that is, ingesting external data as well as producing data that can be consumed by external parties. In particular, it uses various source and sink operators for I/O to URLs, files, databases, and so on; and it uses the special operators Import and Export for I/O to other jobs on the same streaming middleware.
Streaming applications commonly run for long periods of time. Streams that are generated by one application often serve as input to another, and the second application might be deployed when the first is already running. The Import and Export special operators support dynamic application composition, where one application exports a stream, another application imports the stream, but both applications are instantiated separately and might even be mutually anonymous. For example, the streaming middleware might host one or more long-running backbone applications that carry out the bulk of the data processing, and users might launch transient applications that import streams, for example, to visualize results in a dashboard.
type BargainType = rstring symbol, decimal32 price;
composite ExportingMain {
graph
stream<BargainType> TechSectorBargains = FileSource(){param file:"tech";}
stream<BargainType> HealthCareSectorBargains = FileSource(){param file:"health";}
() as ExportOp1 = Export(TechSectorBargains) {
param properties : { kind="bargains", category="tech",
tickers=["IBM", "GOOG", "MSFT"] };
}
() as ExportOp2 = Export(HealthCareSectorBargains) {
param properties : { kind="bargains", category="healthcare",
tickers=["AET", "UNH", "WLP"] };
}
}
type BargainType = rstring symbol, decimal32 price;
composite ImportingMain {
graph
stream<BargainType> TechBargains = Import() {
param subscription : kind == "bargains" && category == "tech";
}
stream<BargainType> IBMOrWLPBargains = Import() {
param subscription : "IBM" in tickers || "WLP" in tickers;
}
}
Using special operators for dynamic application composition provides a consistent syntax for specifying types and parameters. The middleware connects the Import and Export streams if the subscription predicate matches the exported stream properties. If the Import predicate matches multiple streams that are exported by jobs that are running in the middleware, they are all connected. If there are no matching streams, nothing arrives at the Import operator. Properties can also be added, updated, or removed at run time, and so can subscriptions. The compile-time properties and subscriptions serve as initial settings. Since the compiler sees only one application at a time, it cannot statically check whether the types in publication properties and subscription predicates in other applications match. The output from an Import operator cannot be fed into an input port that expects window punctuation. Also, a port of a down-stream operator that is connected to the output stream of an Import operator stays open forever, no matter how many final punctuation markers it receives.
namespace some.nameSpace;
composite Comp(input E) {
graph () as ExportInvoke = Export(E) {
param streamId : "StreamName";
}
}
public composite ExportingMain {
graph stream<int32 x> A = Beacon() { }
() as CompInvoke = Comp(A) { }
}
composite ImportingMain {
graph
stream<int32 x> I2 = Import() {
param applicationScope : "myApplicationScope";
//application scope selected when exporting application launched
applicationName : "some.nameSpace::ExportingMain";
//main operator selected when exporting application launched
streamId : "StreamName";
//string agreed upon by exporting and importing application
}
}
If the explicit applicationScope is omitted, it is implicitly bound to the scope in which the current application was launched. An explicit applicationScope can be used equally with property-based and name-based subscription. The optional applicationName selects the main operator that is selected when the exporting application is launched. It is only valid when streamId is specified. You cannot run two applications that specify the same streamId; IBM Streams rejects the submission of the second application instance.