If you log in
with your developerWorks ID, you can leave comments and feedback for the development team.
This example shows how to develop a simple Hello World application to utilize the stream query feature. The next several sections explain how to build it up.
A very simple stock monitoring example is used. First, make a StockQuote class to represent a quote of a stock issue.
import com.ibm.websphere.objectgrid.streamquery.annotations.StreamQueryRelation;
@StreamQueryRelation(name = "stockQuote", accessType=AccessTypeEnum.FIELD)
public class StockQuote implements Serializable {
private static final long serialVersionUID = 1L;
private String issue;
private float price;
private int volume;
public StockQuote(String issue, float price, int volume) {
this.issue = issue;
this.price = price;
this.volume = volume;
}
public boolean equals(Object o) {
if (this == o)
return true;
if (o instanceof StockQuote) {
StockQuote other = (StockQuote) o;
return (issue == null ? other.issue == null : issue.equals(other.issue)) && price == other.price
&& volume == other.volume;
}
return false;
}
}
This shows a StockQuote object. It represents a snapshot of an issue at a certain price with the transaction volume. We annotate it with StreamQueryRelation to indicate that the stream name is stockQuote.
A stock analyst is interested in the unweighted average price over the last 5 minutes, and writes a query to derive the view from the stream.
1: CREATE VIEW last5MinuteAvgPrice
2: AS
3: SELECT issue, avg(price) as avgPrice FROM
4: (SELECT * FROM sellbid FETCH LATEST 5 MINUTES)
5: group by issue;
Each line is explained to understand this STPSQL statement: Line 4 creates a view of all the stock quotes in the latest 5 minutes. Line 3 then uses the select clause to select the issue and the average price using the avg function. This avg function is grouped by the issue as line 5 indicates. Finally, line 1 defines this view as last5MinuteSellBids . Therefore, the output of the view is a two-column relations with the issue being the key and avgPrice being the value.
The SQL-like syntax that the stream query engine adopts is called Stream Processing Technology Structured Query Language (SPTSQL). Refer to Stream query engine language tutorial for syntax details.
Following is a very simple application to generate the stock quote updates:
import com.ibm.websphere.objectgrid.ObjectGrid;
import com.ibm.websphere.objectgrid.ObjectGridManager;
import com.ibm.websphere.objectgrid.ObjectGridManagerFactory;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.streamquery.StreamQuerySet;
import com.ibm.websphere.projector.Tuple;
import com.ibm.websphere.projector.md.EntityMetadata;
import com.ibm.websphere.projector.md.TupleMetadata;
public class StreamQueryApp1 {
public static void main(String[] args) throws Exception {
ObjectGridManager ogManager = ObjectGridManagerFactory.getObjectGridManager();
ObjectGrid og = ogManager.createObjectGrid("og1");
og.defineMap("stockQuote");
og.defineMap("last5MinuteAvgPrice");
StreamQuerySet sqSet = og.createStreamQuerySet();
sqSet.addStreamMetadata().setValueClass(StockQuote.class);
sqSet.addViewMetadata().setMapName("last5MinuteAvgPrice").setSql(
"CREATE VIEW last5MinuteAvgPrice AS " + "SELECT issue, avg(price) as avgPrice FROM "
+ "(SELECT * FROM stockQuote FETCH LATEST 5 MINUTES) group by issue;");
sqSet.setDeployed(true);
Session session = og.getSession();
ObjectMap streamMap = session.getMap("stockQuote");
ObjectMap viewMap = session.getMap("last5MinuteAvgPrice");
streamMap.insert("IBM", new StockQuote("IBM", 95.00f, 1000));
streamMap.update("IBM", new StockQuote("IBM", 96.00f, 2000));
streamMap.update("IBM", new StockQuote("IBM", 100.00f, 3000));
streamMap.insert("CSCO", new StockQuote("CSCO", 6.00f, 10000));
streamMap.update("CSCO", new StockQuote("CSCO", 6.20f, 2500));
Thread.sleep(1000);
EntityMetadata emd = viewMap.getEntityMetadata();
TupleMetadata keyMD = emd.getKeyMetadata();
Tuple ibmKey = keyMD.createTuple();
ibmKey.setAttribute(0, "IBM");
Tuple ibmValue = (Tuple) viewMap.get(ibmKey);
System.out.println("Average price in the latest 5 minutes for IBM: " + ibmValue.getAttribute(0));
Tuple cscoKey = keyMD.createTuple();
cscoKey.setAttribute(0, "CSCO");
Tuple cscoValue = (Tuple) viewMap.get(cscoKey);
System.out.println("Average price in the latest 5 minutes for CSCO: " + cscoValue.getAttribute(0));
sqSet.setDeployed(false);
}
}
This is a very simple stream query Hello World application. It first initializes a local ObjectGrid with a name. Then, the ObjectGrid maps for both the stream and the view are defined. A StreamQuerySet is then added, and the stream is set so that the stream query set can be viewed.
A stream query set stream consists of one or more streams and views, where views only use the streams and other views defined in this set. A stream is an input to the stream query engine, and a view is an output or intermediate output of the stream query engine. A StreamQuerySet object serves as an cotainment object for the stream and view metadata. It can be deployed or undeployed by calling setDeployed(boolean). Please refer to StreamQuerySet API Documentation
for detailed description.
The value class for the stream is set as StockQuote.class, so ObjectGrid knows how to generate the stream events for the objects stored in the stockQuote map. It will map the object attributes to the columns defined in the SQL. The SQL statement and the map name is set for the view to store the results. The data stored in view map is in tuple format. Refer to Entity Manager for descriptions on Tuple, TupleMetadata, and EntityMetadata. Refer to Stream API Documentation
and View API Documentation
for detailed description.
The quotes in the stockQuote map are inserted and updated. For simplication, only two stock issues are used: IBM and CSCO. The stream query engine then generates the quote streams and calculates the view results.
Allow the program the sleep for a short period of time (for example, 1 second) to allow the stream query engine to do the processing to derive the results. The EntityMetadata for the view map last5MinuteAvgPrice is retrieved, then use normal ObjectMap API to get the value and print them out:
Average price in the latest 5 minutes for IBM: 97.0
Average price in the latest 5 minutes for CSCO: 6.1
Note: In order to run stream query applications, you need the following jar files:
- objectgrid jar file
- ogstreamquery.jar
- castor.jar
- commons-io.jar
- tools.jar from JDK
The first 4 jar files are shipped with ObjectGrid. The last jar file is shipped with JDK. Make sure you have these jar files in your classpath when running the program
This demonstrates a very simple stream query example. This view result is continuously updated if the stream map is updated continuously. It is recommended that you attach an ObjectGridEventListener to the view map to get all the view data changes. Then you can build your own monitoring tool, whether it is in a tabular or a graphical format, to display these results.
In the next sections, more capabilities of stream query are presented.
© Copyright IBM Corporation 2007,2009. All Rights Reserved.