GabrielaJS 270004FR6S Visits (758)
When processing data, it is common to perform data enrichment. Enrichment is useful when the data source contains only partial information, but the analytics require additional information that is available only in other data sources. The InfoSphere Streams database toolkit contains two enrichment operators: ODBCEnrich and SolidDBEnrich. These operators require the data used for enrichment to be in a database.
In this post, we illustrate how to develop an SPL composite that serves as a generic file-based enricher. In this solution, we use a FileSource to scan the enrichment data from a file, and then store it in an in-memory map in a Custom operator. This map is keyed by the attributes used to correlate incoming tuples with the enrichment data. If the enrichment data fully fits in memory, this solution can be more efficient than querying the database every time a tuple must be enriched.
The code below shows a sample invocation of the file-based enricher (operator FileEnrich). In this example, the program generates a stream called Data, which has attributes id and city. Data is then consumed by FileEnrich, which outputs an enriched stream using the id attribute as a key. The output stream contains both Data attributes (id and city) and EnrichT attributes (id and name). Note that because EnrichT and Data share the id attribute, id appears only once in the EnrichedData stream. The FileEnrich operator receives the following parameters:
We now show the code for the generic FileEnrich composite operator. This composite is developed using 2 primitive operators and 1 Custom operator. The first operator is a FileSource (line 11). The FileSource uses the enrichmentFile parameter and produces a stream of type enrichmentType. Using a parameter to establish the type of the FileSource output stream gives users the option to use a CSV file with any set of attributes. The second operator is a Switch (line 16), which serves exclusively to control when the input stream (In) can start flowing into the downstream operator. By default, this operator has an initial status of false (i.e., blocking tuples). The status parameter indicates the action taken once a tuple arrives in the second input stream. In this case, a true value indicates that the switch will open when a tuple arrives. The third operator (lines 21-22), implemented as a Custom, is the one responsible for doing the data enrichment itself.
The Custom operator has two phases of execution. First, it builds a map based on EnrichmentData, the stream generated by the FileSource. To create this map, this operator uses the enrichmentKeyType and the enrichmentType itself (line 25). To populate the map, the operator uses the function getT
The C++ code below shows the implementation of this function. This function returns an error flag in two cases: (i) if the attribute provided as a string does not exist, and (ii) if the attribute type does not match the type of value.
The second stage of execution happens after the stream produced by the FileSource is fully processed, which is indicated by a final punctuation. At this point, the Custom operator notifies the Switch (line 38), and the data enrichment process starts. For enrichment, the Custom operator merges the attributes of Data and EnrichmentData using the assignFrom function (lines 45-46). This function assigns all matching fields from one tuple to the other, so be careful when naming the attributes of the enrichment type. If attribute names overlap, the last assigned value will prevail, which in this case is the value available in the enrichment tuple (line 46).
In summary, the FileEnrich composite has three characteristics that make it generic:
Thanks to Bugra Gedik for this example!
scott.s 2700060BQD Visits (938)
Some operators can be implemented directly in SPL using Custom operators, rather than defining them as native operators in C++ or Java. Implementing logic using Custom operators is typically suitable for operations that do not need to call out to pre-existing libraries, and whose logic operates on, and can be fully expressed with, SPL data types. Writing Custom operators requires less code and simplifies development as there is no need to switch to another language and write an operator model.
For example, consider parsing system messages from /var/log/messages on a Linux system. A typical example looks like the following:
While there is no formal grammar, the structure of a message is:
We would like to write an SPL operator that parses such messages. The input tuples will contain a single string, which contains a single message. The output will be a tuple where each entry in our informal grammar above is its own string attribute:
Our example application reads messages directly from /var/log/messages, line by line. To parse these messages, we separate the raw line into tokens separated by spaces. From there, we can associate indices with attributes. For the date attribute, we know that entries 0, 1 and 2 are part of the date, so we combine them (effectively un-tokenizing them). The message itself can have any number of tokens separate by spaces, but we know that it must start at index 5.
Implementing this logic directly in SPL with a Custom operator is easier than dropping down to C++ or Java. However, as written, there is a problem: we cannot reuse this operator. If we wanted to parse messages in the same way in this or in another application, we would need to copy this code. It exists only inside of this Custom operator; there is no way to "name" this custom logic.
Wrapping Custom operators in composites solves this problem. By making a Custom operator invocation the only part of a composite's stream graph, we can use the composite operator as a way to "name" that logic. For example:
Note that the ParseMessages composite knows about the ServiceMessage type. Because of this fact, our composite is not type generic. In future posts, we will explore the various kinds of genericity available to composites.
We can now use ParseMessages in our application:
The operator ParseMessages can now be reused elsewhere in the application, and we can place it in toolkits so that other applications can make use of it.