scott.a.s 2700060BQD Visits (3374)
In a previous post, we looked at the practice of wrapping Custom operators in composites. We observed that the result was not generic:
Technically, however, the above code does have some type genericity, just not much. We will explore what that genericity is in a moment, but first let's go over the kinds of genericity there are in SPL, and which ones can apply to a composite operator.
Operators that can handle any number of input and output ports are port number generic. Composites cannot be port number generic; composites must define the exact number of input and output ports they provide. Primitive operators, however, can be port number generic. In our example, ParseMessages defines one input and one output port.
Composites can be type generic, which means that they can handle streams of any type. ParseMessages is not type generic, because the type of the stream Out is fully specified to have the type ServiceMessage.
The type for In, however, is partially type generic. The type is not fully specified, although we have made one assumption about it: that it contains an attribute named raw that is of type rstring. In the example application we previously developed, that attribute was the only attribute in the stream type, but that does not need to be true in general. For example, we could invoke ParseMessages in this way:
Even though ParseMessages does not know about the attributes processedTime and networkName, it can still handle the type AugmentedRawMessage on its input stream because it has an rstring attribute named raw.
However, we can still make ParseMessages attribute generic. We can do this by modifying the composite to take an attribute as a parameter:
Composites that take attributes as parameters are attribute generic because they make no assumptions about an attribute's name. The type of the attribute, however, cannot be generic. In the above version of ParseMessages, the attribute we provide upon invocation must have type rstring, like this example:
If we tried to provide an attribute that was not an rstring (such as processedTime), the compiler would raise an error the first time it tried to use the inappropriately typed attribute.
Using a similar idea, we can still make ParseMessages even more generic. While we want to ensure that the output stream has the specific attributes date, hostname, service and message, there is no reason for us to *limit* the output stream's type to those attributes. However, because we fully specified the type name, we have forced that to be the case. We can remove that restriction by not fully specifying the type:
Note that we can no longer create a tuple literal of our output tuple type - creating a tuple literal requires knowing the full type of a tuple, but we want to remain partially type generic. To do so, we only require the out the output's stream type contains date, hostname, service, message and that they are type rstring. We invoke this composite in this way:
The remaining kind of genericity is operator genericity, which is possible when a composite takes an operator as its parameter. We will cover operator genericity in a future post.
scott.a.s 2700060BQD Visits (3417)
Some operators can be implemented directly in SPL using Custom operators, rather than defining them as native operators in C++ or Java. Implementing logic using Custom operators is typically suitable for operations that do not need to call out to pre-existing libraries, and whose logic operates on, and can be fully expressed with, SPL data types. Writing Custom operators requires less code and simplifies development as there is no need to switch to another language and write an operator model.
For example, consider parsing system messages from /var/log/messages on a Linux system. A typical example looks like the following:
While there is no formal grammar, the structure of a message is:
We would like to write an SPL operator that parses such messages. The input tuples will contain a single string, which contains a single message. The output will be a tuple where each entry in our informal grammar above is its own string attribute:
Our example application reads messages directly from /var/log/messages, line by line. To parse these messages, we separate the raw line into tokens separated by spaces. From there, we can associate indices with attributes. For the date attribute, we know that entries 0, 1 and 2 are part of the date, so we combine them (effectively un-tokenizing them). The message itself can have any number of tokens separate by spaces, but we know that it must start at index 5.
Implementing this logic directly in SPL with a Custom operator is easier than dropping down to C++ or Java. However, as written, there is a problem: we cannot reuse this operator. If we wanted to parse messages in the same way in this or in another application, we would need to copy this code. It exists only inside of this Custom operator; there is no way to "name" this custom logic.
Wrapping Custom operators in composites solves this problem. By making a Custom operator invocation the only part of a composite's stream graph, we can use the composite operator as a way to "name" that logic. For example:
Note that the ParseMessages composite knows about the ServiceMessage type. Because of this fact, our composite is not type generic. In future posts, we will explore the various kinds of genericity available to composites.
We can now use ParseMessages in our application:
The operator ParseMessages can now be reused elsewhere in the application, and we can place it in toolkits so that other applications can make use of it.
GabrielaJS 270004FR6S Visits (3687)
A new feature of InfoSphere Streams 3.0 allows dynamic filter expressions for applications that use
A recurring application sharing scenario is when different consuming applications are interested in processing different subsets of the exported stream. Prior to Streams 3.0, the importing application would receive all tuples available in the exported stream. This would result in waste of network resources, as the whole stream was transmitted but a filtering operation on the consuming application side would immediately discard many tuples. In a scenario where there are many different consumers, transferring the full stream multiple times wastes a significant amount of resources.
To reduce network transfers, developers can take advantage of dynamic filter expressions in Import operators. During application instantiation, the filtering expression is effectively shipped to the Export operator side. During runtime, the Export operator evaluates the filtering expression to decide which tuples should be transmitted to the consuming application.
The following figures show some SPL code using this new feature. All examples use a stream of type Schema declared as "int64 streamSubset, rstring stringSubset, uint32 random".
The segment below shows an Export operator that exports the stream produced by a Custom operator, which consumes a stream produced by a FileSource operator. In this example, the Custom operator forwards downstream all incoming tuples without doing any specific transformation. In reality, developers may substitute this operator with any arbitrary SPL topology. The invocation of the Export operator does not need to change from prior versions of Streams.
The file "sample.dat" has the following 10 lines:
In the Import side, one must now use the filter parameter, as in the example below. This instance of the Import operator receives only tuples where the streamSubset attribute has value 1 and the stringSubset attribute has value “streams”. This filtered, imported stream is processed by a Custom operator, which then sends the output directly to a FileSink. As in the example above, the Custom operator just illustrates a sample topology. The filter parameter in Import allows the construction of more complex expressions, similar to the subscription parameter.
The figure below shows the Streams instance graph when running the applications above. To illustrate the power of the dynamic filter expressions, we also run two other applications. The applications are similar to the importer application above, but use the following filtering expressions:stringSubset == "sources" || stringSubset == "sinks"
streamSubset == 2 && stringSubset == "operator"
As highlighted by the red rectangle, the Custom operator of one of the importing applications (right side) receives only 2 out of the 10 tuples submitted to the Export operator (10 lines in ‘sample.dat’). The filtering for the “streams” keyword allows 4 tuples to be transmitted and the filtering for “sources” or “sinks” keywords allows 3 tuples to be transmitted. The total number of tuples transmitted by the Export operator using filtering is 9, while a configuration without filtering would transfer 30 tuples.
Summary: When the application consumes only a subset of the tuples of an exported stream, use the filter parameter of the Import operator to save network resources.