IBM®
Skip to main content
    Country/region [select]      Terms of use
 
 
    
     Home      Products      Services & solutions      Support & downloads      My account     
 
developerworks > My developerWorks >  Dashboard > WebSphere eXtreme Scale V6.1 User Guide > ... > ObjectGrid Stream Query API > Hello World stream query example
developerWorks
Log In   View a printable version of the current page.
Overview Connect Spaces Forums Wikis
Hello World stream query example
Added by Jian.Tang, last edited by Chris.D.Johnson on Jan 07, 2008  (view change)
Labels: 
(None)

See the WebSphere eXtreme Scale Wiki for links to up-to-date eXtreme Scale Version 7.0 documentation.
Getting Started Examples Reference API documentation

If you log in with your developerWorks ID, you can leave comments and feedback for the development team.

This example shows how to develop a simple Hello World application to utilize the stream query feature. The next several sections explain how to build it up.
A very simple stock monitoring example is used. First, make a StockQuote class to represent a quote of a stock issue.

StockQuote.java
import com.ibm.websphere.objectgrid.streamquery.annotations.StreamQueryRelation;

@StreamQueryRelation(name = "stockQuote", accessType=AccessTypeEnum.FIELD)
public class StockQuote implements Serializable {

    private static final long serialVersionUID = 1L;

    private String issue;

    private float price;

    private int volume;

    public StockQuote(String issue, float price, int volume) {
        this.issue = issue;
        this.price = price;
        this.volume = volume;
    }

   public boolean equals(Object o) {
        if (this == o)
            return true;

        if (o instanceof StockQuote) {
            StockQuote other = (StockQuote) o;
            return (issue == null ? other.issue == null : issue.equals(other.issue)) && price == other.price
                    && volume == other.volume;
        }

        return false;
    }
}

This shows a StockQuote object. It represents a snapshot of an issue at a certain price with the transaction volume. We annotate it with StreamQueryRelation to indicate that the stream name is stockQuote.

A stock analyst is interested in the unweighted average price over the last 5 minutes, and writes a query to derive the view from the stream.

last5MinuteAvgPrice SQL
1: CREATE VIEW last5MinuteAvgPrice
2: AS
3:     SELECT issue, avg(price) as avgPrice FROM
4:         (SELECT * FROM sellbid FETCH LATEST 5 MINUTES)
5:    group by issue;

Each line is explained to understand this STPSQL statement: Line 4 creates a view of all the stock quotes in the latest 5 minutes. Line 3 then uses the select clause to select the issue and the average price using the avg function. This avg function is grouped by the issue as line 5 indicates. Finally, line 1 defines this view as last5MinuteSellBids . Therefore, the output of the view is a two-column relations with the issue being the key and avgPrice being the value.

The SQL-like syntax that the stream query engine adopts is called Stream Processing Technology Structured Query Language (SPTSQL). Refer to Stream query engine language tutorial for syntax details.

Following is a very simple application to generate the stock quote updates:

StreamQueryApp1.java
import com.ibm.websphere.objectgrid.ObjectGrid;
import com.ibm.websphere.objectgrid.ObjectGridManager;
import com.ibm.websphere.objectgrid.ObjectGridManagerFactory;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.streamquery.StreamQuerySet;
import com.ibm.websphere.projector.Tuple;
import com.ibm.websphere.projector.md.EntityMetadata;
import com.ibm.websphere.projector.md.TupleMetadata;

public class StreamQueryApp1 {

    public static void main(String[] args) throws Exception {
        ObjectGridManager ogManager = ObjectGridManagerFactory.getObjectGridManager();

        // Create an ObjectGrid
        ObjectGrid og = ogManager.createObjectGrid("og1");

        // Define the map to contain the stock quote
        og.defineMap("stockQuote");

        // Define the map to contain the average price in the latest 5 minutes.
        og.defineMap("last5MinuteAvgPrice");

        // Add a stream query set to the ObjectGrid
        StreamQuerySet sqSet = og.createStreamQuerySet();

        // Add a stream to the stream query set and set the class name
        // to represent the data stored in the stream map
        sqSet.addStreamMetadata().setValueClass(StockQuote.class);

        // Add a view to the stream query set and set the class name
        // to represent the data stored in the viewmap
        sqSet.addViewMetadata().setMapName("last5MinuteAvgPrice").setSql(
                "CREATE VIEW last5MinuteAvgPrice AS " + "SELECT issue, avg(price) as avgPrice FROM "
                        + "(SELECT * FROM stockQuote FETCH LATEST 5 MINUTES) group by issue;");

        // Deploy the stream query set
        sqSet.setDeployed(true);

        Session session = og.getSession();

        ObjectMap streamMap = session.getMap("stockQuote");
        ObjectMap viewMap = session.getMap("last5MinuteAvgPrice");

        streamMap.insert("IBM", new StockQuote("IBM", 95.00f, 1000));
        streamMap.update("IBM", new StockQuote("IBM", 96.00f, 2000));
        streamMap.update("IBM", new StockQuote("IBM", 100.00f, 3000));

        streamMap.insert("CSCO", new StockQuote("CSCO", 6.00f, 10000));
        streamMap.update("CSCO", new StockQuote("CSCO", 6.20f, 2500));

        // Sleep 1 second to make sure the view data is derived.
        Thread.sleep(1000);

        // The data stored in the view map is in the format of tuples.
        // So we need to create tuple keys to access the tuple values.
        EntityMetadata emd = viewMap.getEntityMetadata();
        TupleMetadata keyMD = emd.getKeyMetadata();
        Tuple ibmKey = keyMD.createTuple();
        ibmKey.setAttribute(0, "IBM");
        Tuple ibmValue = (Tuple) viewMap.get(ibmKey);
        System.out.println("Average price in the latest 5 minutes for IBM: " + ibmValue.getAttribute(0));

        Tuple cscoKey = keyMD.createTuple();
        cscoKey.setAttribute(0, "CSCO");
        Tuple cscoValue = (Tuple) viewMap.get(cscoKey);
        System.out.println("Average price in the latest 5 minutes for CSCO: " + cscoValue.getAttribute(0));

        // Undeploy the stream query to stop all the threads running in the stream query engine
        sqSet.setDeployed(false);
    }
}

This is a very simple stream query Hello World application. It first initializes a local ObjectGrid with a name. Then, the ObjectGrid maps for both the stream and the view are defined. A StreamQuerySet is then added, and the stream is set so that the stream query set can be viewed.

A stream query set stream consists of one or more streams and views, where views only use the streams and other views defined in this set. A stream is an input to the stream query engine, and a view is an output or intermediate output of the stream query engine. A StreamQuerySet object serves as an cotainment object for the stream and view metadata. It can be deployed or undeployed by calling setDeployed(boolean). Please refer to StreamQuerySet API Documentation for detailed description.

The value class for the stream is set as StockQuote.class, so ObjectGrid knows how to generate the stream events for the objects stored in the stockQuote map. It will map the object attributes to the columns defined in the SQL. The SQL statement and the map name is set for the view to store the results. The data stored in view map is in tuple format. Refer to Entity Manager for descriptions on Tuple, TupleMetadata, and EntityMetadata. Refer to Stream API Documentation and View API Documentation for detailed description.

The quotes in the stockQuote map are inserted and updated. For simplication, only two stock issues are used: IBM and CSCO. The stream query engine then generates the quote streams and calculates the view results.

Allow the program the sleep for a short period of time (for example, 1 second)  to allow the stream query engine to do the processing to derive the results. The EntityMetadata for the view map last5MinuteAvgPrice is retrieved, then use normal ObjectMap API to get the value and print them out:

StreamQueryApp1 output
Average price in the latest 5 minutes for IBM: 97.0
Average price in the latest 5 minutes for CSCO: 6.1

Note: In order to run stream query applications, you need the following jar files:

  • objectgrid jar file
  • ogstreamquery.jar
  • castor.jar
  • commons-io.jar
  • tools.jar from JDK

The first 4 jar files are shipped with ObjectGrid. The last jar file is shipped with JDK. Make sure you have these jar files in your classpath when running the program

This demonstrates a very simple stream query example. This view result is continuously updated if the stream map is updated continuously. It is recommended that you attach an ObjectGridEventListener to the view map to get all the view data changes. Then you can build your own monitoring tool, whether it is in a tabular or a graphical format, to display these results.

In the next sections, more capabilities of stream query are presented.

Wiki Disclaimer and License
© Copyright IBM Corporation 2007,2009. All Rights Reserved.


 
    About IBM Privacy Contact