If you log in
with your developerWorks ID, you can leave comments and feedback for the development team.
1. Introduction
1.1 Purpose
The purpose of this tutorial is to teach the SQL programming model of the stream query engine. The stream query engine used by ObjectGrid is called the Stream Processing Technology (SPT) engine. This section introduces the concepts underlying the SPT engine, and the syntax and semantics of SPT's specification language SPTSQL. During the course of the tutorial, you will see numerous specific examples of SPT stream and view specifications corresponding to different problem requirements. At the end of the tutorial, you should be able to formulate your own problem requirements into SPTSQL stream and view specifications.
1.2 Concepts
1.2.1 SPT execution engine: streams and views
SPT engine is a middleware for integrating information providers (producers) with information consumers. At runtime, information providers will connect to one or more streams in a SPT system, then over the course of time will deliver events - structured data - to these streams. Information consumers will connect to one or more views in a SPT system. Each view will hold state derived from the events in one or more streams. As new events are delivered to these streams, the states may change; these changes are delivered to information consumers as notifications. A consumer can also examine the state of a view programmatically (see Section1.2.4).
1.2.2 Publish-subscribe: anonymity and separability
SPT is an extension of the publish-subscribe paradigm, a widely used paradigm in event-based systems. It is characterized by anonymity and separability. Anonymity means that:
- information producers are not required to know who is consuming the information they produce, or specify how to deliver it;
- symmetrically, information consumers are not required to know who is providing the information.
Separability means that the only programming required is for each separate consumer to specify what is wanted to receive. Noone has to explicitly write centralized logic to route information from producers towards those consumers that need it.
SPT differs from conventional publish-subscribe systems in that subscribers can subscribe to derived state, not just to the original streams or to filtered subsets of these streams. Because SPT extends the publish-subscribe paradigm, you might sometimes call information providers publishers, consumers subscribers and view specifications subscriptions.
1.2.3 SPTSQL: stream specifications and view specifications
The SPTSQL language defines exactly two types of specification statements for the use of application integrators: stream specifications that define the structure of streams, and view specifications (sometimes called queries) that define how to derive views from streams or from other views. For example, the stream specification:
The stream specification defines a stream named StockTrades, and provides its schema - that is, the names and types of the values within events that are published to the stream at runtime. In this example, each stock trade event contains a timestamp t, an issue name (issue), a current trading price (price), and the number of shares traded (volume). Stream is always keyed on the timestamp.
Similarly, the following shows the view specification:
The view specification defines a view named Example1 derived from the StockTrades stream. At runtime, this view will track the average and current price, and the total volume of the five most actively traded stocks, based upon the activity during the most recent two minutes of trading.
 | Useful Information
In SPT, views change automatically as a result of changes to the streams or views they depend upon, but views are never explicitly modified by clients at runtime. |
In Example1 the consumer wrote a single view specification that applied three transformations to the stream StockTrades:
- Narrowing the original stream to a sliding window of the last 2 minutes of trades;
- Computing a tuple containing average and current price, and total volume for each group of trades having the same issue;
- Selecting the 5 issues having the highest total volume.
It is also possible for views to be defined in terms of other views. In the following example, a first view defines a window on the trades; a second view defines the average and current price and the total volume for each issue from the first view; and a third view selects the top 5 issues within the second view based on their total volume (yielding the same content as the earlier view Example1). These three view specifications might have been entered by three different consumers:
1.2.4 SPT data model: maps and tables
Although SPT might use various internal representations to hold events, notifications, streams, and views, these representations can vary from one implementation to the next. Therefore, they are kept hidden from writers of specifications and from producers and consumers. The results of queries are defined in terms of two abstractions:
- A map, which is a simple mathematical idea (also the name of the Java interface through which results of queries can be examined by consumers programmatically);
- A table, which is both the conventional way that views are displayed in GUI dashboards and in texts and reports, and is also the familiar abstraction of relational databases.
Mostly, the abstractions are equivalent. In some cases, however, there is information in the map that will be inappropriate to show in a dashboard, yet is available programmatically where needed. Where the two conflict, you should view the map definition as the more rigorous, and the table as the more informal.
All SPT data - both source streams and derived views - is an abstract representation as one or more maps and a possible visualization as a table. Hereafter, the term view is used in an extended sense to denote both source streams and derived views.
1.2.5 Eventual correctness guarantee
SPT is implemented using a messaging engine. Messages take time to travel between producers and consumers. Hence, state in views may lag behind the true state (the state which would be seen if changes to all views were updated simultaneously to updates to source streams). Further, the amount of lag can differ between consumers. Further still, a consumer who successively peeks at a view is not guaranteed to see every intermediate state through which that view passed.
What SPT does guarantee, even in the presence of delays and of failures within the distributed SPT engine, are two properties, collectively known as eventual correctnes":
- Safety: Any promises entailed by a state will be honored: e.g., if a final value is seen, it will not be overridden with a different value; if a range is seen, there will not be a later value outside that range. Furthermore, these promises will be sound - that is, they will be consistent with the application of the view expression to every future value of its inputs.
- Liveness: For any state of the input streams on which a view depends, the user will eventually see the result of the view applied to either that state or to a later state. This means that the user will not have to wait infinitely long to see a particular result.
Eventual correctness is formally defined using SPT's monotonic type system.
1.2.6 SPTSQL vs. SQL
SPTSQL is based upon the relational query language SQL. In this tutorial, any prior familiarity with SQL is not assumed. Even if you are familiar with SQL, it is recommended to read through the explanations of SPTSQL. Since SQL was originally designed for snapshot queries and SPTSQL for continuous queries, there can be subtle differences. For instance, in SPTSQL unknown and silence are first-class values in addition to the normal integer, string, or float values of attributes of streams.
2. SPTSQL Statements
2.1 Stream Specifications
SPT streams contain events published by information producers. Each event from a publisher is augmented with a timestamp assigned by the SPT engine when the event arrives. (In the future, SPT may accept events from publishers that already include timestamps in their events.) The stream is a map from a single key column representing time to as many non-key columns as there are attributes in the event. Attribute types may be integers, floating point numbers, or strings. Strings may have maximum sizes; numeric values may have ranges. Each column name must be a unique identifier.
You define a stream using a CREATE STREAM statement. The format is:
Here, once again, is the stream specification for the stock trades stream:
The value of a stream is a map from ticks of time to a non-key tuple containing attributes of the event that occurred at that tick of time. If no event occurred at that tick of time, the non-key attributes contain final silence. If the tick of time is in the future, the non-key attributes contain unknown. The current density is 1000*216 ticks per second, making the probability of two events occurring at the same tick acceptably low. There is a value of time called the _doubt horizon_ that represents the logical present, such that all ticks after the doubt horizon are unknown. The table representation does not typically show unknown or silence rows. By default, an un-displayed tick earlier than the doubt horizon maps to final silence for all attributes; an un-displayed tick later than the doubt horizon maps to unknown for all attributes. Following is a typical table display for a stream. (For ease of display, the illustrations show unnaturally small numbers for t.)
| t |
issue |
price |
volume |
| 200 |
IBM |
8330 |
200 |
| 400 |
MSFT |
2520 |
300 |
| 500 |
IBM |
8332 |
600 |
| 520 |
IBM |
8331 |
50 |
The current supported ways of expanding <type> are:
- String - declared using the SQL syntax: CHAR(<size>) or VARCHAR(<max-size>)
- Integer - declared using the syntax: SMALLINT or INTEGER
- Floating Point - declared using the syntax: DECIMAL or REAL or FLOAT
Numeric types can optionally specify a maximum and minimum value using the syntax: BETWEEN <x> AND <y>. However, these ranges will not be checked at runtime.
2.2 View Specifications
Enter a query defining a new view using this syntax:
Following, the different queries that can appear in a view specification are discussed.
2.3 Adding, Removing, Renaming, Splitting, Combining Columns
The simple SELECT query leaves the number of visible rows in a table unchanged while performing any of the following functions:
- Inserting a new non-key column whose value in each row depends upon some expression based upon other columns in that same row.
- Removing a non-key column.
- Renaming an existing column.
- Splitting a column into two components each with its own name.
- Combining two columns into a single column.
The syntax of <query> is:
The <input-view> is either:
- The name of the view (as stated earlier, this can mean either a stream or a derived view) that is input to this query, or
- An embedded query (<query>) defining an intermediate unnamed view, or
- A join (discussed in Section 2.6).
The <select-list> defines how the columns of the result view are computed from the columns of the input view, and optionally, how they are named. All column names of a view must be unique identifiers. Although a single SELECT statement can request any combination of the five listed functions, we shall explain their operation separately here. A <select-list> is either the character * (which means include all the original columns of the input view without performing any of the five functions), or is a comma-separated list of <derived-column>.
2.3.1 Inserting a new non-key column (EXTEND operation)
This operation adds a column to the table. The values of that column in each row are computed by applying a particular expression to the values of the same-keyed row in the input table.
The syntax of EXTEND is :
Each column of a table must have a unique name. Therefore, it would be an error to specify a new column name that is the same as one that will still exist in the result.
Following is an example view specification, which adds a column worth as the product of price and volume:
Following is the example result:
| t |
issue |
price |
volume |
worth |
| 200 |
IBM |
8330 |
200 |
1666000 |
| 400 |
MSFT |
2520 |
300 |
756000 |
| 500 |
IBM |
8332 |
600 |
4999200 |
| 550 |
IBM |
8331 |
50 |
416550 |
The expression can involve any of the usual arithmetic or boolean operators applied to columns of the input relation or to constants. The new column will contain silence (or unknown) if the row it is currently in is silence (or unknown) - therefore, any row hidden in the input will be hidden in the output. The value of the new column in each visible row will contain the result of evaluating the given expression using the values of the columns in that row.
A situation can arise where the columns referred to in the expression have different key dependencies, which is discussed in section 2.5.4.
2.3.2 Removing a non-key column (PROJECT operation)
A projection is a view of a table with certain columns omitted. Following SQL syntax, you specify projection simply by listing all the other columns that you want to keep.
Following is an example view specification, which drops column volume from Example3:
Following is the example result:
| t |
issue |
price |
worth |
| 200 |
IBM |
8330 |
1666000 |
| 400 |
MSFT |
2520 |
756000 |
| 500 |
IBM |
8332 |
4999200 |
| 550 |
IBM |
8331 |
416550 |
It is only permitted to drop columns that are non-keys, because the result must always have the form of a map. For instance, suppose some view mapped the key pair [k1, k2] to the non-key pair [n1, n2]. If you deleted all the k2 attributes, you would no longer have a map, because a single value of k1 might map to multiple [n1, n2] pairs. The way to remove a key is to group by some other column and to aggregate the remaining columns over all possible values of that key (see Section 2.5.3 which illustrates how to remove column t2 from Example6).
2.3.3 Renaming a column (RENAME operation)
Renaming is a degenerate case of the EXTEND operation, except that instead of an expression defining a new column, an existing column is given a new name. Except for the column names, the output of the operation is identical to the input. Renaming is primarily used for two purposes:
- To avoid name conflicts when joining two tables,
- To give columns in different relations the same name, so that a JOIN USING operation can be performed (see Section 2.6.2).
Following is an example view specification:
Following is the example result:
| t |
issue |
p |
worth |
| 200 |
IBM |
8330 |
1666000 |
| 400 |
MSFT |
2520 |
756000 |
| 500 |
IBM |
8332 |
4999200 |
| 550 |
IBM |
8331 |
416550 |
2.3.4 Splitting a column (SPLIT operation)
Often it is desirable to split a column into multiple parts. For example, you might want to split time ticks into days, hours, minutes, seconds, and ticks. The typical reason is that you might want to keep the larger unit as a key while aggregating over the smaller unit. If the original column was a key (non-key), the result columns will be keys (non-keys).
Here is the syntax for the SPLIT operation:
The value <d> is a decimal number used to determine divisor for the operation. If periods is not specified, then the divisor is simply <d> rounded to the nearest integer. Otherwise, (and this only makes sense when it is a time column being split), <d> is converted to an integer number of ticks by being multiplied by the number of ticks in a second, minute, hour, or day. The column <col> must be of integer type. The output view will look exactly like the input view, except that the single key column <col> will be replaced by the two key columns <new-col1> and <new-col2>. For each row, if the value of <column-name> in the input relation is v and the divisor is d, then the value of <new-col1> is v/d, and the value of <new-col2> is v mod d.
Following is an example view specification:
Following is the example result:
| t1 |
t2 |
issue |
price |
volume |
| 1 |
0 |
IBM |
8330 |
200 |
| 2 |
0 |
MSFT |
2520 |
300 |
| 2 |
100 |
IBM |
8332 |
600 |
| 2 |
150 |
IBM |
8331 |
50 |
Since the table is now in this form, it is possible to group by t1 and aggregate information within the different 200-tick periods, as shown in Section 2.5.
2.3.5 Combining two columns into a single column (COMBINE)
This operation simply inverts the operation of the SPLIT operation. It takes two integer columns named col1 and col2, of which col2 has range from 0 to n-1, and produces a combined column col. The value of col in each row of the output table is equal to col1*n+col2. It is easy to see that this is exactly the inverse of a SPLIT operation with divisor of n. If one of the columns being combined is a key, and the other is not, the result will be a key. This will not change the number of visible rows, but it does mean that in the map view, all the values of the new key that do not correspond to visible rows in the original table will map to silence.
Following is the syntax for the COMBINE operation:
Following is an example view specification, which merges columns t1 and t2 into a single column t:
2.4 Selecting Subsets of Rows (selection, top-k, window)
In this section, ways of reducing the amount of information in a view by hiding particular rows that are not wanted are discussed. In the table representation, this corresponds to not showing those rows. In the map representation, this corresponds to mapping the corresponding key tuple to a value tuple containing silence in all columns. Based upon the query, the silence might be a temporary silence or a final silence. There are three row-hiding operations supported:
- Filtering
- Top-k
- Time-based windowing.
2.4.1 Filtering (WHERE)
Filtering means applying a boolean predicate to each visible row of a table, retaining all rows for which the predicate evaluates to true, and hiding the others. The predicate is an expression that can contain the same operators as in the EXTEND operation. In the map representation, the output map will look like the input map, except that each key tuple whose row failed to match the predicate will map to an output tuple of all silence.
Following is the syntax for the SELECT operation:
Following is an example view specification, which retains all trades with a worth greater than 1,500,000:
Following is the example result:
| t |
issue |
price |
worth |
| 200 |
IBM |
8330 |
1666000 |
| 500 |
IBM |
8332 |
4999200 |
As in standard SQL, SPTSQL allows the filter operation to be followed by any of the operations of Section 2.3, with a <select-list> that specifies extending, splitting, combining, projecting, or renaming columns.
2.4.2 Top-k (GREATEST/SMALLEST operation)
The top-k operation generates a result table that is derived from the input table by hiding all rows except the k rows with the greatest (or smallest) values of a specified ordering column. If there are only k or fewer visible rows in the input table, then the result table is the same as the input table. If necessary, the values of the key columns are used to break ties between rows having identical values of the ordering column, so that there is a unique deterministic result.
Here is the syntax for the Top-k operation:
The <direction> is either ASC[ENDING] or DESC[ENDING], which results in choosing respectively the smallest k rows or the largest k rows as determined by the value of the ordering column.
 | Useful Information
The ORDER BY syntax is adapted from SQL. It is slightly misleading because rows of a table or entries in a map have no intrinsic order, either in SPT or in relational databases. ORDER BY <col> DESCENDING FETCH FIRST k should be thought of as just a SQL idiom for the top-k operator. It does not guarantee that a display will order the rows in this order. |
Following is an example view specification, which selects two trades with the smallest worth:
Following is the example result:
| t |
issue |
price |
worth |
| 400 |
MSFT |
2520 |
756000 |
| 550 |
IBM |
8331 |
416550 |
2.4.3 Windowing operation
The windowing operation is a specialized top-k operator that applies to time-keyed tables, such as input streams, or filtered input streams. There are two versions of this operator:
- Those that select the latest k non-silent events, regardless of time. This is just a special case of a top-k where the ordering field is the time column.
- Those that select the latest k final ticks, regardless of whether the ticks contain events or silence. This kind of window is sometimes called a sliding window, because it hides all but the rows whose time column lies within a range between the (moving) present and a fixed number of ticks prior to the present.
Following is the syntax for the windowing opreation:
If EVENTS is specified, then the window will select the latest k (non-silent) events. Otherwise, this statement is interpreted as a sliding window request for the latest k ticks (or seconds, or minutes, or hours) regardless of whether they are silent or not.
Following is an example view specification, which selects the latest 120 ticks:
Following is the example result:
| t |
issue |
price |
worth |
| 400 |
MSFT |
2520 |
756000 |
| 500 |
IBM |
8332 |
4999200 |
| 550 |
IBM |
8331 |
416550 |
As time advances, and the doubt horizon advances, rows with sufficiently early values of t will become silent and disappear from the table representation.
2.5 Aggregation with Grouping (GROUP BY)
Aggregation involves the following steps:
- Partition the rows of a table into groups.
- For each group, perform the following:
- Collect the set of all values of a particular column in that group,
- Combine those values using an aggregation operation to yield a single result,
- Assign that result to the value of a new column in a row associated with the group.
In SPT, groups are identified by the distinct values of some columns. Consider one simple aggregation operation: SUM. For example, Example6 table and the specification:
2.5.1 Using GROUP BY to partition into groups
The GROUP BY clause specifies what column or columns are used to partition the table into groups. In this case, each distinct value of column t1 defines a group.
| t1 |
t2 |
issue |
price |
volume |
| 1 |
0 |
IBM |
8330 |
200 |
| 2 |
0 |
MSFT |
2520 |
300 |
| 2 |
100 |
IBM |
8332 |
600 |
| 2 |
150 |
IBM |
8331 |
50 |
Notice that there are two visible partitions, one corresponding to rows with t1=1, and the other corresponding to rows with t1=2. (There are partitions for all other values of t1 as well, but their rows are all hidden - either silence or unknown.) Within each partition, you perform the SUM operator on the set of volume columns for the visible rows in that partition. In this example, the result is 200 for the t1=1 partition, 950 for the t1=2 partition, and 0 for all the other partitions. The value 0 is said to be the default; rows mapping to that value are not displayed in the table view.
 | Useful Information
SPT interprets the SUM operator exactly like the Sigma operator in mathematics: when applied to a set of two or more non-silent quantities, SUM yields the familiar sum; when applied to a set with a single non-silent quantity, SUM yields the single quantity; when applied to a set with no non-silent quantities, SUM yields 0. Database systems treat the case of an empty set differently.
In this example, the difference does not show. SPT defines the sum of no quantities as 0; databases define it as NULL. But in both cases the table view will not show the rows. The difference shows up if you join the table with another (see Example18). Now the joined totals will show up as 0 in SPT and NULL in most databases. |
The result of this query is two maps: the original, mapping the key tuple [t1, t2] to [issue, price, volume], and a second that maps only [t1] to [total]. Following is a table view of the result:
| t1 |
t2 |
issue |
price |
volume |
| 1 |
0 |
IBM |
8330 |
200 |
| 2 |
0 |
MSFT |
2520 |
300 |
| 2 |
100 |
IBM |
8332 |
600 |
| 2 |
150 |
IBM |
8331 |
50 |
This is a case where an operator can increase the number of maps for the result. The display form shown above is said to be normalized (this term comes from database theory). In current SPT, the map form remains normalized, but the default display as a table is denormalized - that is, collapsed into a single table by equi-joining the two tables. You define the equi-join of two tables as a new table formed by taking every combination of rows from each input table provided that the values in the like-named columns are the same. The like-named columns are replaced by a single column. For Example11, the denormalized form of the result is shown in the following table:
| t1 |
t2 |
issue |
price |
volume |
total |
| 1 |
0 |
IBM |
8330 |
200 |
200 |
| 2 |
0 |
MSFT |
2520 |
300 |
950 |
| 2 |
100 |
IBM |
8332 |
600 |
950 |
| 2 |
150 |
IBM |
8331 |
50 |
950 |
 | Useful Information
The denormalized form of the table typically has more redundant information than the normalized form does. In this example, total = 950 is shown multiple times. |
|
2.5.2 Aggregation operators
The syntax for an aggregation query is:
The columns in the <group-list> can include both key columns (as in Example11), and non-key columns (as in Example1). In the latter case, an original non-key column becomes a key column in one of the maps. All the possible values of that column's domain that do not appear in the original map will be hidden. Instead, they will map to either temporary or final silence.
We have already shown how SUM applies to all partitions, including the cases of singleton and empty partitions. For the other operators:
- COUNT returns a non-negative integer containing the count of the rows having visible values of the named column; if there are none, COUNT returns 0.
- AVG always returns a floating-point result computed as the quotient of SUM and COUNT for the named column; if COUNT returns 0, then AVG returns silence.
- MAX returns a result having the same base type as the corresponding input column. The value is equal to the value of the largest non-silence in the set. If there is only one non-silence, MAX returns that value. If there are none, MAX returns a value equal to the smallest value in the range of the base type.
- MIN is exactly like MAX, except that it returns the value of the smallest non-silence in the set. If there are no non-silent values in the set, it returns a value equal to the largest value in the range of the base type.
- LATEST applies only to columns in time-keyed relations. It selects from all final and non-silent values of the designated column the one having the greatest value of the time key. If the designated column has no non-silent final values, the value of LATEST is silence.
2.5.3 Projecting columns from views with multiple maps
In our earlier examples, views were represented by single maps. Projecting away a key column was disallowed, because the result would no longer be a map. However, if a view has multiple maps, it is legal to project away a key column provided that all the non-key columns of the same map are also projected away. In effect, the projection will discard one or more of the maps. To illustrate this point, suppose we only wanted to keep the total volume in Example11. You could rewrite the query as follows:
The result is simply the single map and table:
The view specification SummariesByIssue, shown at the beginning of this tutorial, was an analogous case. All columns except those depending on issue were discarded, including the time key.
2.5.4 Expressions combining columns from different maps
As mentioned in Section 2.5.1, aggregation with grouping can yield views whose (normalized) representation contains multiple maps. Suppose one adds a new column by applying an EXTEND operation. In the table representation, there is no special problem, because by default there is a single denormalized table. In the map representation, it is possible that an expression combines values belonging to maps keyed by different values. In such a case, the result column will depend upon the union of the keys. This might result in a new map appearing in the result.
2.5.5 Implementation Restriction
In the current implementation of SPT, there is an implementation restriction: Though you can combine expressions from different maps, you cannnot combine expressions from maps whose keys partially overlap. For instance, in Example11, the keys of one map [t1, t2] partially overlap the keys of the second [t1]. Until this implementation restriction is removed, you will need to rewrite queries that derive new aggregations from such queries. The rewritten queries will use the JOIN operation.
For example, you currently might not write this query, which partitions Example11 by issue as well as by t1:
Without the restriction, the resulting relation would have two maps:
| t1 |
volByPeriod |
| 1 |
200 |
| 2 |
950 |
| issue |
volByIssue |
| IBM |
850 |
| MSFT |
300 |
However, it is possible to circumvent this restriction by rewriting this query using the JOIN operation. See Section 2.6.3 for an explanation.
2.5.6 Intermediate and Final Results of Aggregations
SPT computes not only the current value of a view, but also its possible future range. In Example12, the query maps a time period t1 into a total volume total. Observers of the result set will be able to see not only the current value of total, but also its potential upper and lower bound (which depend upon how many not-yet-counted ticks remain in the period). Once all the ticks of a period have been counted, the range will reduce to a single number, and the result will be final. Although ranges and finality do not show up in the default table view, nevertheless the result range, and the finality of the value are both observable programmatically via the map view.
2.5.7 Grouping with Top-K
The GROUP BY construct can be used with the top-k operator as well as with aggregations. The table is partitioned based on the value of the columns in the <group-list>, as before. Then all but the k rows in each partition having the greatest (or smallest) value of the ordering column are hidden. Except for the special case k=1, the key and non-key columns of the result are the same as the key and non-key columns of the original table. For k=1, there is guaranteed to be no more than a single visible row per partition; hence the ordering column can be also used as a key.
Following is an example query, which shows the 2 trades for each issue that have the highest volume:
Following is the example result:
| t |
issue |
price |
volume |
| 200 |
IBM |
8330 |
200 |
| 400 |
MSFT |
2520 |
300 |
| 500 |
IBM |
8332 |
600 |
2.6 Merging and Joining Tables
In the preceding examples, the queries all derived a new view from a single input view. The JOIN and MERGE operations allow multiple views (including streams) to be combined.
2.6.1 Merge
The MERGE operation is used to combine views from multiple sources into a consolidated view. This operation can merge any pair of views with identical map representations; in practice it is specialized to streams because the probability of two events with the same tick is negligible. The syntax is adapted from the UNION operator in SQL:
The result is a table with the same key and non-key column names as the inputs, but now a row in the output is visible if it is visible in one of the input-views or if it is visible and identical in both views. It is silent if it is hidden in both of the input-views. Because you cannot have multiple events with the same key, it is also silent if (by unlucky coincidence) a row with identical key is visible in both input-views and the non-key values are different. A different approach to handling colliding rows with identical keys is to insert the keyword LEFT or RIGHT in front of UNION. In that case, if both rows are visible, the one to the left (or right) is preferentially included in the result. Currently, merge is supported only for streams.
Following is a straightforward example of merging two input streams:
First, a second stream is defined, ForeignStockTrades, with a schema identical to that of StockTrades:
Next, you define a merge of the two streams:
Following are some sample values of the streams StockTrades and ForeignStockTrades:
| t |
issue |
price |
volume |
| 200 |
IBM |
8330 |
200 |
| 400 |
MSFT |
2520 |
300 |
| 500 |
IBM |
8332 |
600 |
| 520 |
IBM |
8331 |
50 |
| t |
issue |
price |
volume |
| 230 |
SBUX |
1493 |
200 |
| 410 |
MSFT |
2530 |
500 |
| 480 |
IBM |
8336 |
400 |
| 520 |
IBM |
8335 |
40 |
Following is the merged view:
| t |
issue |
price |
volume |
| 200 |
IBM |
8330 |
200 |
| 230 |
SBUX |
1493 |
500 |
| 400 |
MSFT |
2520 |
300 |
| 410 |
MSFT |
2530 |
500 |
| 480 |
IBM |
8336 |
400 |
| 500 |
IBM |
8332 |
600 |
| 520 |
IBM |
8331 |
50 |
Notice that in this example two trades did occur on the same tick, which is an unusually rare occurrence, since there are 65,536,000 ticks per second in the present implementation. In this case, the value from StockTrades was chosen. To absolutely avoid collisions, perform the following steps:
- Append a new column exchange, defined as a number in the range 0..1, to each relation, appending the value 0 to those in the first view, and 1 to those in the second;
- Combine t with exchange into a single column;
- Now perform the merge, with the guarantee that there will be no collisions, since the left table will have only even keys and the right table only odd keys.
2.6.2 Join
The JOIN operation is a very useful operation in databases, since it allows the query writer to relate one table to another. It is also very useful in SPT. Its semantics was discussed earlier, in section 2.5, in connection with converting two or more maps to a single denormalized map. The JOIN resulting from denormalization is automatic. However, a specification can apply JOIN to any pair of views. The result (if viewed as a denormalized table) contains one column for each column displayed in either or both of the input views. It contains one visible row for each possible pair of rows in the input views, provided that for each column name appearing in both input views and on the USING list, the values of each column in one input view is equal to the value of the same-named column in the other. The keys of the result table are the union of the keys of the original tables. In the map view, the result of a join is simply the union of the two maps, since the table representation is already defined as the join of the maps.
 | Useful Information
If there is no USING list, the JOIN is called a full JOIN rather than an equi-join. In database terminology, a SPT join is called an outer JOIN , because all rows are included, not just rows that map to non-silent values. |
As an example, you decide that you want to correlate trades on a new stream named ForeignStockTrades with the volByIssue aggregation from my local StockTrades stream.
First, extract the volByIssue column from Example13 by projecting away all columns other than the pairing of key issue to non-key volByIssue. Next, join ForeignStockTrades with the new result, consolidating the column issue:
 | Warning
Due to the SPT restriction, Example13 is not supported. Refer to the next section on workarounds. |
Following are the two tables being joined:
| issue |
volByIssue |
| IBM |
850 |
| MSFT |
300 |
| t |
issue |
price |
volume |
| 230 |
SBUX |
1493 |
200 |
| 410 |
MSFT |
2530 |
500 |
| 480 |
IBM |
8336 |
400 |
| 520 |
IBM |
8335 |
40 |
Following is the denormalized result:
| t |
issue |
price |
volume |
volByIssue |
| 230 |
SBUX |
1493 |
200 |
0 |
| 410 |
MSFT |
2530 |
500 |
300 |
| 480 |
IBM |
8336 |
400 |
850 |
| 520 |
IBM |
8335 |
40 |
850 |
Notice that the volByIssue column for issue SBU has the value zero rather than being silent (the database world's interpretation of SUM of an empty set) and rather than the row being hidden entirely (corresponding to an "inner" join). (It has not yet been traded, and zero is SPT's value for the sum of an empty set of numbers - see Section 2.5.)
 | Useful Information
If you want to exclude rows such as the entry for SBUX, it is easy to simply apply a filter to show only those rows with volByIssue greater than 0. The results are the same as an inner join. |
There are a few syntactic peculiarities relating to joining that SPTSQL inherits from SQL. They are retained in order not to confuse users of SQL who expect them:
- A join operation can be written: (a) as a query, (b) using a join-expression within the <input-view> of a SELECT statement, (c) using a comma-separated list of names or expressions within the input view. Only the first and second form allows a USING list. The following are all equivalent:
- If the <input-view> is a join expression, rather than a parenthesized subquery, then the column names in the <select-list> may be written in the form <table-name>.<column-name>. This is needed because there might be identically named columns in the tables being joined. The peculiarity is that the following queries are different:
In the first query, a column X from First can be referred to within the select-list as X (if it's not also in Second) or as First.X (unconditionally). In the second query, only unqualified names may appear in the select-list, and any reference to an ambiguous column name is an error. In SPT, the columns of a named view must have distinct names - if inputs to a join have same-named columns and if they are not equi-joined by appearing on the USING list, this rule might entail renaming some columns to guarantee distinctness.
- A table in the join expression can itself be temporarily renamed - this is useful when a table is joined with itself:
Now, a column C can be named in the select-list unambiguously as First.C or Second.C.
2.6.3 Using JOIN as an alternative way to create views containing multiple maps
It was mentioned in Section 2.5.5 that an implementation restriction prohibits aggregation operations operating on relations, such as Example11, containing multiple maps with overlapping keys. However, it is possible to circumvent this restriction in the following ways.
Following is a query that achieves the same result as Example13:
Notice that this re-creates a view containing two separate maps by applying different aggregations to the query of Example6, and combines them using JOIN. The maps now have non-overlapping keys, and it is now possible to perform further operations on the result. For example, if an EXTEND operation uses an expression combining both non-key columns, the result would be part of a third map with the combined key [t1, issue]:
The result now contains three maps:
| t1 |
volByPeriod |
| 1 |
200 |
| 2 |
950 |
| issue |
volByIssue |
| IBM |
850 |
| MSFT |
300 |
| t1 |
issue |
diff |
| 1 |
IBM |
-650 |
| 1 |
MSFT |
-100 |
| 2 |
IBM |
100 |
| 2 |
MSFT |
650 |
Below is the denormalized table representation of the result of this query. Notice that the result of the diff column is the same in the map and table representations.
| t1 |
issue |
volByPeriod |
volByIssuediff |
diff |
| 1 |
IBM |
200 |
850 |
-650 |
| 1 |
MSFT |
200 |
300 |
-100 |
| 2 |
IBM |
950 |
850 |
100 |
| 2 |
MSFT |
950 |
300 |
650 |
3. Sample Applications
In this section, two sample applications are provided. By learning and understanding these samples, you will master how to use SPTSQL to build your applications.
3.1 Financial: Buy and Sell Bids
A single financial input stream for all our previous examples was used for previous examples. However, some of the examples have been contrived, showing off particular features of different operators. In this section, some queries are enumerated that might be written against a pair of streams. These streams are called BuyBids and SellBids. They have the format of StockTrades. However, they represent not actualized trades, but offers to buy and sell some number of shares of some issue at some specified price.
3.1.1 The input streams:
3.1.2 Aggregating the streams to produce total volumes:
3.1.3 Correlating the totals to produce the difference for each issue-price pair:
3.1.4 Selecting only the differences that exceed a particular threshold:
3.1.5 Selecting a sliding window in time and computing aggregates by issue:
3.1.6 Chopping time into "periods" of 1 minute and computing separate statistics for each period :
3.1.7 Enumerating the latest 5 periods having some buy activity:
3.1.8 Matching the buy and sell bids over all periods and computing difference in total volume:
3.1.9 Viewing these matched bids only over the last 5 periods
3.1.10 Viewing the matches for each period having the greatest 2 differences between buy and sell volumes:
3.2 _Systems Management Scenario _
3.2.1 The input streams:
3.2.2 Moving average of utilization over last minute
3.2.3 Alert between Bad Messages and high Machine Utilization in same minute
3.2.4 Which are the 3 most frequently alerted machineIDs?
3.2.5 Which userIDs are failing to log on to 3 or more different machines in one period?
3.2.6 Tell me about all security alerts for the "watched" users
3.2.7 Merge multiple streams of Bad messages
3.2.8 Tell me whether some machineID has not reported statistics for a whole period
The straightforward and precise way of writing this query requires the isFinal predicate which is planned for SPT but not currently implemented:
© Copyright IBM Corporation 2007,2010. All Rights Reserved.