import com.ibm.websphere.objectgrid.ObjectGrid;
import com.ibm.websphere.objectgrid.ObjectGridManager;
import com.ibm.websphere.objectgrid.ObjectGridManagerFactory;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.streamquery.StreamQuerySet;
import com.ibm.websphere.projector.Tuple;
import com.ibm.websphere.projector.md.EntityMetadata;
import com.ibm.websphere.projector.md.TupleMetadata;
public class StreamQueryApp2 {
public static void main(String\[\] args) throws Exception {
ObjectGridManager ogManager = ObjectGridManagerFactory.getObjectGridManager();
ObjectGrid og = ogManager.createObjectGrid("og1");
og.defineMap("stockQuote");
og.defineMap("last5MinuteAvgPrice");
StreamQuerySet sqSet = og.createStreamQuerySet();
sqSet.addStreamMetadata().setValueClass(StockQuote.class);
sqSet
.addViewMetadata()
.setSql(
"CREATE VIEW last5MinuteQuote AS SELECT t, issue, volume, price*volume AS amount FROM stockQuote FETCH LATEST 5 MINUTES;");
sqSet
.addViewMetadata()
.setMapName("last5MinuteAvgPrice")
.setSql(
"CREATE VIEW last5MinuteAvgPrice AS SELECT issue, totalAmount/totalVolume As avgPrice from "
\+"(SELECT issue, sum(amount) as totalAmount, sum(volume) As totalVolume FROM "
+ "last5MinuteQuote group by issue);");
sqSet.setDeployed(true);
Session session = og.getSession();
ObjectMap streamMap = session.getMap("stockQuote");
ObjectMap viewMap = session.getMap("last5MinuteAvgPrice");
streamMap.insert("IBM", new StockQuote("IBM", 95.00f, 1000));
streamMap.update("IBM", new StockQuote("IBM", 96.00f, 2000));
streamMap.update("IBM", new StockQuote("IBM", 100.00f, 3000));
streamMap.insert("CSCO", new StockQuote("CSCO", 6.00f, 10000));
streamMap.update("CSCO", new StockQuote("CSCO", 6.20f, 2500));
Thread.sleep(1000);
System.out.println("check the data");
EntityMetadata emd = viewMap.getEntityMetadata();
TupleMetadata keyMD = emd.getKeyMetadata();
Tuple ibmKey = keyMD.createTuple();
ibmKey.setAttribute(0, "IBM");
Tuple ibmValue = (Tuple) viewMap.get(ibmKey);
System.out.println("Average price in the latest 5 minutes for IBM: " + ibmValue.getAttribute(0));
Tuple cscoKey = keyMD.createTuple();
cscoKey.setAttribute(0, "CSCO");
Tuple cscoValue = (Tuple) viewMap.get(cscoKey);
System.out.println("Average price in the latest 5 minutes for CSCO: " + cscoValue.getAttribute(0));
sqSet.setDeployed(false);
}
}