See the WebSphere eXtreme Scale Wiki for links to eXtreme Scale Version 7.0 documentation.
If you log in
with your developerWorks ID, you can leave comments and feedback for the development team.
JDK 5.0 introduced metadata (annotation) support to simplify Java programming model. Stream query supports users to use annotations to configure both streams and views. Since the view results are stored as entity tuples, the view annotation also utilizes entity manager annotations too.
The example shows how to write the weighted average price example using annotations. In order to show more advanced features, create a new stock quote class: ComplexStockQuote.
import com.ibm.websphere.objectgrid.streamquery.annotations.StreamQueryRelation;
import java.io.Serializable;
import com.ibm.websphere.objectgrid.streamquery.annotations.AccessTypeEnum;
import com.ibm.websphere.objectgrid.streamquery.annotations.StreamQueryColumn;
import com.ibm.websphere.objectgrid.streamquery.annotations.StreamQueryRelation;
@StreamQueryRelation(name = "stockQuote", accessType=AccessTypeEnum.FIELD)
public class ComplexStockQuote implements Serializable {
private static final long serialVersionUID = 1L;
@StreamQueryColumn
private String issue;
@StreamQueryColumn
private float price;
@StreamQueryColumn(name="volume")
private int currentVolume;
private long quoteTime;
public ComplexStockQuote(String issue, float price, int volume) {
this.issue = issue;
this.price = price;
this.currentVolume = volume;
}
public boolean equals(Object o) {
if (this == o)
return true;
if (o instanceof ComplexStockQuote) {
ComplexStockQuote other = (ComplexStockQuote) o;
return (issue == null ? other.issue == null : issue.equals(other.issue)) && price == other.price
&& currentVolume == other.currentVolume;
}
return false;
}
}
The ComplexStockPrice class is similar to StockPrice class except:
- Annotate the issue, price, and currentVolume fields with @StreamQueryColumn, but not the quoteTime field. This tells the ObjectGrid runtime to generate events for these three fields. Therefore, the stream SQL statement only contains three non-key columns mapped to these three annotated fields. The field quoteTime is not used in stream events.
- The @streamQueryColumn annotation of the currentVolume field has an attribute name with value volume. This tells the ObjectGrid stream query runtime to generate a column name called volume (instead of currentVolume) in the stream SQL statement. This allows users to alias fields to SQL column names.
Refer to StreamQueryRelation API Documentation
and StreamQueryColumn API Documentation
for more details.
Write an AveragePrice class to represent the view result. Remember, the view results are stored in view maps as entity tuples. By providing this class, you tell the entity manager that you could project the tuple into the AveragePrice object. The AveragePrice class follows:
import com.ibm.websphere.objectgrid.streamquery.annotations.AccessTypeEnum;
import com.ibm.websphere.objectgrid.streamquery.annotations.StreamQueryRelation;
import com.ibm.websphere.projector.annotations.Basic;
import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;
@StreamQueryRelation(
mapName = "last5MinuteAvgPrice",
isStream = false,
sql = "CREATE VIEW last5MinuteAvgPrice AS SELECT issue, totalAmount/totalVolume As avgPrice FROM "
+ "(SELECT issue, sum(amount) as totalAmount, sum(volume) As totalVolume FROM "
+ "(SELECT t, issue, volume, price*volume AS amount FROM stockQuote FETCH LATEST 5 MINUTES) "
+ "group by issue);", accessType = AccessTypeEnum.FIELD)
@Entity(name = "last5MinuteAvgPrice")
public class AveragePrice {
private static final long serialVersionUID = 1L;
@Id
private String issue;
@Basic(alias = "avgPrice")
protected float averagePrice;
public AveragePrice() {
}
public AveragePrice(String issue, float averagePrice) {
this.issue = issue;
this.averagePrice = averagePrice;
}
public boolean equals(Object o) {
if (this == o)
return true;
if (o instanceof AveragePrice) {
AveragePrice other = (AveragePrice) o;
return (issue == null ? other.issue == null : issue.equals(other.issue))
&& averagePrice == other.averagePrice;
}
return false;
}
public int hashCode() {
return issue.hashCode();
}
}
Annotate this class with the @StreamQueryRelation annotation. Set its map name, then mark isStream as false to indicate it for a stream query view. Set the sql statement. This SQL is a bit long, since the intermediate view last5MinuteQuote and last5MinuteAvgPrice used in example 2 are combined into this SQL.
There is another annotation @Entity for this class. This indicates this class also represents an entity. Use @Id and @Basic to indicate the tuple attributes. Alias the field averagePrice to avgPrice, which is a column in the SQL statement. A default constructor as required by the entity manager is provided as well.
Revise the stream query application sample to show how to use EntityManager to get the tuple results. The streamQueryApp3 follows:
import com.ibm.websphere.objectgrid.ObjectGrid;
import com.ibm.websphere.objectgrid.ObjectGridManager;
import com.ibm.websphere.objectgrid.ObjectGridManagerFactory;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.em.EntityManager;
import com.ibm.websphere.objectgrid.streamquery.StreamQuerySet;
public class StreamQueryApp3 {
public static void main(String\[\] args) throws Exception {
ObjectGridManager ogManager = ObjectGridManagerFactory.getObjectGridManager();
ObjectGrid og = ogManager.createObjectGrid("og1");
og.defineMap("stockQuote");
og.defineMap("last5MinuteAvgPrice");
StreamQuerySet sqSet = og.createStreamQuerySet();
sqSet.addStreamMetadata().setValueClass(ComplexStockQuote.class);
sqSet.addViewMetadata().setValueClass(AveragePrice.class);
sqSet.setDeployed(true);
Session session = og.getSession();
ObjectMap streamMap = session.getMap("stockQuote");
ObjectMap viewMap = session.getMap("last5MinuteAvgPrice");
streamMap.insert("IBM", new ComplexStockQuote("IBM", 95.00f, 1000));
streamMap.update("IBM", new ComplexStockQuote("IBM", 96.00f, 2000));
streamMap.update("IBM", new ComplexStockQuote("IBM", 100.00f, 3000));
streamMap.insert("CSCO", new ComplexStockQuote("CSCO", 6.00f, 10000));
streamMap.update("CSCO", new ComplexStockQuote("CSCO", 6.20f, 2500));
Thread.sleep(1000);
EntityManager em = session.getEntityManager();
AveragePrice ibmAvgPrice = (AveragePrice) em.find(AveragePrice.class, "IBM");
AveragePrice cscoAvgPrice = (AveragePrice) em.find(AveragePrice.class, "CSCO");
System.out.println("IBM Average Price=" + ibmAvgPrice.averagePrice );
System.out.println("CSCO Average Price=" + cscoAvgPrice.averagePrice );
sqSet.setDeployed(false);
}
}
Notice that this application simplifies due to the help from annotations: You do not need to set map names or SQL statements for the stream and view. After inserting the data and sleep for 1 second, use EntityManager API to find the results in the view map, then print the price to the system out:
IBM Average Price in the last 5 minute=97.833336
CSCO Average Price in the last 5 minute=6.04
For more detailed descirption about stream query annotations, please refer to Stream query annotation reference.
© Copyright IBM Corporation 2007,2009. All Rights Reserved.