IBM®
Skip to main content
    Country/region [select]      Terms of use
 
 
    
     Home      Products      Services & solutions      Support & downloads      My account     
 
developerworks > My developerWorks >  Dashboard > WebSphere eXtreme Scale V6.1 User Guide > ... > ObjectGrid Stream Query API > Stream query annotations
developerWorks
Log In   View a printable version of the current page.
Overview Connect Spaces Forums Wikis
Stream query annotations
Added by Jian.Tang, last edited by Chris.D.Johnson on Jan 07, 2008  (view change)
Labels: 

Getting Started Examples Reference API documentation

See the WebSphere eXtreme Scale Wiki for links to eXtreme Scale Version 7.0 documentation.
If you log in with your developerWorks ID, you can leave comments and feedback for the development team.

JDK 5.0 introduced metadata (annotation) support to simplify Java programming model. Stream query supports users to use annotations to configure both streams and views. Since the view results are stored as entity tuples, the view annotation also utilizes entity manager annotations too.

The example shows how to write the weighted average price example using annotations. In order to show more advanced features, create a new stock quote class: ComplexStockQuote.

ComplexStockQuote.java
import com.ibm.websphere.objectgrid.streamquery.annotations.StreamQueryRelation;

import java.io.Serializable;

import com.ibm.websphere.objectgrid.streamquery.annotations.AccessTypeEnum;
import com.ibm.websphere.objectgrid.streamquery.annotations.StreamQueryColumn;
import com.ibm.websphere.objectgrid.streamquery.annotations.StreamQueryRelation;

@StreamQueryRelation(name = "stockQuote", accessType=AccessTypeEnum.FIELD)
public class ComplexStockQuote implements Serializable {

    private static final long serialVersionUID = 1L;

    @StreamQueryColumn
    private String issue;

    @StreamQueryColumn
    private float price;

    @StreamQueryColumn(name="volume")
    private int currentVolume;

    private long quoteTime;

    public ComplexStockQuote(String issue, float price, int volume) {
        this.issue = issue;
        this.price = price;
        this.currentVolume = volume;
    }

    public boolean equals(Object o) {
        if (this == o)
            return true;

        if (o instanceof ComplexStockQuote) {
            ComplexStockQuote other = (ComplexStockQuote) o;
            return (issue == null ? other.issue == null : issue.equals(other.issue)) && price == other.price
                    && currentVolume == other.currentVolume;
        }

        return false;
    }
}

The ComplexStockPrice class is similar to StockPrice class except:

  1. Annotate the issue, price, and currentVolume fields with @StreamQueryColumn, but not the quoteTime field. This tells the ObjectGrid runtime to generate events for these three fields. Therefore, the stream SQL statement only contains three non-key columns mapped to these three annotated fields. The field quoteTime is not used in stream events.
  2. The @streamQueryColumn annotation of the currentVolume field has an attribute name with value volume. This tells the ObjectGrid stream query runtime to generate a column name called volume (instead of currentVolume) in the stream SQL statement. This allows users to alias fields to SQL column names.

Refer to StreamQueryRelation API Documentation and StreamQueryColumn API Documentation for more details.

Write an AveragePrice class to represent the view result. Remember, the view results are stored in view maps as entity tuples. By providing this class, you tell the entity manager that you could project the tuple into the AveragePrice object. The AveragePrice class follows:

AveragePrice.java
import com.ibm.websphere.objectgrid.streamquery.annotations.AccessTypeEnum;
import com.ibm.websphere.objectgrid.streamquery.annotations.StreamQueryRelation;
import com.ibm.websphere.projector.annotations.Basic;
import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;

@StreamQueryRelation(
    mapName = "last5MinuteAvgPrice",
    isStream = false,
    sql = "CREATE VIEW last5MinuteAvgPrice AS SELECT issue, totalAmount/totalVolume As avgPrice FROM "
        + "(SELECT issue, sum(amount) as totalAmount, sum(volume) As totalVolume FROM "
        + "(SELECT t, issue, volume, price*volume AS amount FROM stockQuote FETCH LATEST 5 MINUTES) "
        + "group by issue);", accessType = AccessTypeEnum.FIELD)
@Entity(name = "last5MinuteAvgPrice")
public class AveragePrice {
    private static final long serialVersionUID = 1L;

    @Id
    private String issue;

    @Basic(alias = "avgPrice")
    protected float averagePrice;

    public AveragePrice() {
    }

    public AveragePrice(String issue, float averagePrice) {
        this.issue = issue;
        this.averagePrice = averagePrice;
    }

    public boolean equals(Object o) {
        if (this == o)
            return true;

        if (o instanceof AveragePrice) {
            AveragePrice other = (AveragePrice) o;
            return (issue == null ? other.issue == null : issue.equals(other.issue))
                    && averagePrice == other.averagePrice;
        }

        return false;
    }

    public int hashCode() {
        return issue.hashCode();
    }

}

Annotate this class with the @StreamQueryRelation annotation. Set its map name, then mark isStream as false to indicate it for a stream query view. Set the sql statement. This SQL is a bit long, since the intermediate view last5MinuteQuote and last5MinuteAvgPrice used in example 2 are combined into this SQL.

There is another annotation @Entity for this class. This indicates this class also represents an entity. Use @Id and @Basic to indicate the tuple attributes. Alias the field averagePrice to avgPrice, which is a column in the SQL statement. A default constructor as required by the entity manager is provided as well.

Please refer to Relationships with entity manager section for more readings on relationships between stream queries and entities.

Revise the stream query application sample to show how to use EntityManager to get the tuple results. The streamQueryApp3 follows:

StreamQueryApp3.java
import com.ibm.websphere.objectgrid.ObjectGrid;
import com.ibm.websphere.objectgrid.ObjectGridManager;
import com.ibm.websphere.objectgrid.ObjectGridManagerFactory;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.em.EntityManager;
import com.ibm.websphere.objectgrid.streamquery.StreamQuerySet;

public class StreamQueryApp3 {

    public static void main(String\[\] args) throws Exception {
        ObjectGridManager ogManager = ObjectGridManagerFactory.getObjectGridManager();

        // Create an ObjectGrid
        ObjectGrid og = ogManager.createObjectGrid("og1");

        // Define the map to contain the stock quote
        og.defineMap("stockQuote");

        // Define the map to contain the average price in the latest 5 minutes.
        og.defineMap("last5MinuteAvgPrice");

        // Add a stream query set to the ObjectGrid
        StreamQuerySet sqSet = og.createStreamQuerySet();

        // Add a stream to the stream query set and set the class name
        // to represent the data stored in the stream map
        sqSet.addStreamMetadata().setValueClass(ComplexStockQuote.class);

        // Add a view to the stream query set and set the class name
        // to represent the data stored in the viewmap
        sqSet.addViewMetadata().setValueClass(AveragePrice.class);

        // Deploy the stream query set
        sqSet.setDeployed(true);

        Session session = og.getSession();

        ObjectMap streamMap = session.getMap("stockQuote");
        ObjectMap viewMap = session.getMap("last5MinuteAvgPrice");

        streamMap.insert("IBM", new ComplexStockQuote("IBM", 95.00f, 1000));
        streamMap.update("IBM", new ComplexStockQuote("IBM", 96.00f, 2000));
        streamMap.update("IBM", new ComplexStockQuote("IBM", 100.00f, 3000));

        streamMap.insert("CSCO", new ComplexStockQuote("CSCO", 6.00f, 10000));
        streamMap.update("CSCO", new ComplexStockQuote("CSCO", 6.20f, 2500));

        // Sleep 1 second to make sure the view data is derived.
        Thread.sleep(1000);

        EntityManager em = session.getEntityManager();
        AveragePrice ibmAvgPrice = (AveragePrice) em.find(AveragePrice.class, "IBM");
        AveragePrice cscoAvgPrice = (AveragePrice) em.find(AveragePrice.class, "CSCO");

        System.out.println("IBM  Average Price=" + ibmAvgPrice.averagePrice );
        System.out.println("CSCO Average Price=" + cscoAvgPrice.averagePrice );
        // Undeploy the stream query to stop all the threads running in the stream query engine
        sqSet.setDeployed(false);
    }
}

Notice that this application simplifies due to the help from annotations: You do not need to set map names or SQL statements for the stream and view. After inserting the data and sleep for 1 second, use EntityManager API to find the results in the view map, then print the price to the system out:

StreamQueryApp3 output
IBM  Average Price in the last 5 minute=97.833336
CSCO Average Price in the last 5 minute=6.04

For more detailed descirption about stream query annotations, please refer to Stream query annotation reference.

Wiki Disclaimer and License
© Copyright IBM Corporation 2007,2009. All Rights Reserved.
Docs Stream query annotation reference (WebSphere eXtreme Scale V6.1 User Guide)


 
    About IBM Privacy Contact