See the WebSphere eXtreme Scale Wiki for links to eXtreme Scale Version 7.0 documentation.
If you log in
with your developerWorks ID, you can leave comments and feedback for the development team.
You might be familiar with how to create an application to run stream query using programming APIs. ObjectGrid also supports configured stream queries in the ObjectGrid xml file. The configured stream queries are especially useful in pre-configuring stream queries when the ObjectGrid is distributed. You can configure stream queries for a distributed ObjectGrid and these stream queries will be deployed when the ObjectGrid is activated on the server.
This example show how to modify the StreamQueryApp1 example to use an ObjectGrid XML file to configure the stream query. Start by defining a very simple ObjectGrid xml without stream queries:
<?xml version="1.0" encoding="UTF-8"?>
<objectGridConfig xmlns:xsi="http:
xsi:schemaLocation="http: xmlns="http://ibm.com/ws/objectgrid/config">
<objectGrids>
<objectGrid name="og1">
<backingMap name="stockQuote" readOnly="false" copyKey="true"/>
<backingMap name="last5MinuteAvgPrice" readOnly="false" copyKey="false"/>
</objectGrid>
</objectGrids>
</objectGridConfig>
This ObjectGrid XML contains one ObjectGrid og1 and two maps: stockQuote and last5MinuteAvgPrice. However, it is not stream query-aware at this point.
Add and set a stream query to this ObjectGridand, then configure the stream and view for this stream query set.
<?xml version="1.0" encoding="UTF-8"?>
<objectGridConfig xmlns:xsi="http: xsi:schemaLocation="http://ibm.com/ws/objectgrid/config ../objectGrid.xsd" xmlns="http://ibm.com/ws/objectgrid/config">
<objectGrids>
<objectGrid name="og1">
<backingMap name="stockQuote" readOnly="false" copyKey="true" streamRef="stockQuote"/>
<backingMap name="last5MinuteAvgPrice" readOnly="false" copyKey="false" viewRef="last5MinuteAvgPrice"/>
<streamQuerySet name="stockQuoteSQS">
<stream name="stockQuote" valueClass="com.ibm.ws.objectgrid.streamquery.sample.guide.StockQuote" >
</stream>
<view name="last5MinuteAvgPrice"
valueClass="com.ibm.ws.objectgrid.streamquery.sample.guide.AveragePrice"
sql="CREATE VIEW last5MinuteAvgPrice AS SELECT issue, avg(price) as avgPrice FROM (SELECT * FROM stockQuote FETCH LATEST 5 MINUTES) group by issue;"/>
</streamQuerySet>
</objectGrid>
</objectGrids>
</objectGridConfig>
First, add a streamRef attribute to the stockQuote backingMap configuration. This tells the ObjectGrid that this map is a stream source map; any insert or update to this map will be converted into a streaming event to the stream query engine.
Next, add a viewRef sttribute to the backingMap configuratoin for last5MinuteAvgPrice map. This tells the ObjectGrid that this map is a view map; the view output from the stream query engine is converted into ObjectGrid entity tuple format and then inserted/updated to this view map.
In the streamQuerySet configuration, define a streamQuerySet with the name stockQuoteSQS. For this stream query set, define one stream and one view. You can see that the configuration parameters correspond to the attributes you set in the programming example.
Now, the application can be simplified since you do not need to create an ObjectGrid and stream query set programmatically.
import java.net.URL;
import java.util.List;
import com.ibm.websphere.objectgrid.ObjectGrid;
import com.ibm.websphere.objectgrid.ObjectGridManager;
import com.ibm.websphere.objectgrid.ObjectGridManagerFactory;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.streamquery.StreamQuerySet;
import com.ibm.websphere.projector.Tuple;
import com.ibm.websphere.projector.md.EntityMetadata;
import com.ibm.websphere.projector.md.TupleMetadata;
public class StreamQueryApp1XML {
public static void main(String\[\] args) throws Exception {
ObjectGridManager ogManager = ObjectGridManagerFactory.getObjectGridManager();
List ogs = ogManager.createObjectGrids(new URL("file:etc/test/smile/sample/streamQueryApp1.xml"));
ObjectGrid og = (ObjectGrid) ogs.get(0);
StreamQuerySet sqSet = (StreamQuerySet) og.getStreamQuerySets().get(0);
sqSet.setDeployed(true);
Session session = og.getSession();
ObjectMap streamMap = session.getMap("stockQuote");
ObjectMap viewMap = session.getMap("last5MinuteAvgPrice");
streamMap.insert("IBM", new StockQuote("IBM", 95.00f, 1000));
streamMap.update("IBM", new StockQuote("IBM", 96.00f, 2000));
streamMap.update("IBM", new StockQuote("IBM", 100.00f, 3000));
streamMap.insert("CSCO", new StockQuote("CSCO", 6.00f, 10000));
streamMap.update("CSCO", new StockQuote("CSCO", 6.20f, 2500));
Thread.sleep(1000);
EntityMetadata emd = viewMap.getEntityMetadata();
TupleMetadata keyMD = emd.getKeyMetadata();
Tuple ibmKey = keyMD.createTuple();
ibmKey.setAttribute(0, "IBM");
Tuple ibmValue = (Tuple) viewMap.get(ibmKey);
System.out.println("Average price in the latest 5 minutes for IBM: " + ibmValue.getAttribute(0));
Tuple cscoKey = keyMD.createTuple();
cscoKey.setAttribute(0, "CSCO");
Tuple cscoValue = (Tuple) viewMap.get(cscoKey);
System.out.println("Average price in the latest 5 minutes for CSCO: " + cscoValue.getAttribute(0));
sqSet.setDeployed(false);
}
}
The same output is displayed if you run this example.
For a detailed reference of each fo the configuration options, see the Local ObjectGrid configuration topic.
© Copyright IBM Corporation 2007,2009. All Rights Reserved.