See the WebSphere eXtreme Scale Wiki for links to eXtreme Scale Version 7.0 documentation.
If you log in
with your developerWorks ID, you can leave comments and feedback for the development team.
Previous examples have shown how to create stream queries programmatically or using XML configuration, and how JDK 5.0 annotations are used to configure stream query metadata. All the stream query examples shown in the previous two sections are running in local ObjectGrid instances. In this section, running stream queries in distributed ObjectGrid instances are examined.
Distributed stream queries are configured in the server-side ObjectGrid. When the ObjectGrid is activated in the primary server, the stream query set is deployed and activated. The inputs to the stream query are the inserts and updates to the server-side stream maps, and the outputs of the stream query are reflected in the server-side view maps.
How can a client access the data in the server-side view maps? ObjectGrid has provided several ways to allow a client access the data in the server-side view maps:
- Using the normal ObjectMap APIs to get the data
- Using the ObjectGridEventListener to listen for the map changes
- Using the ReplicationMapListener to listen for the map changes
The first approach, using the normal ObjectMap APIs, requires that the client knows the key(s) to be accessed. This is typically undesirable because the view result keys are usually unknown and the results are updated continuously. The second and third approaches are using the push model, which is more feasible in this case. The server-side view map is replicated to the near cache on the client side, thus you can use listeners to listen for the data changes.
The following example expand the previous example into the distributed environment, and overrides the ObjectGridEventListener plug-in on the client side to listen for data changes.
Sample XML configuration
Use the same ObjectGrid xml file used in previous example: a stream query set containing one stream stockQuote and one view last5MinuteAvgPrice.
<?xml version="1.0" encoding="UTF-8"?>
<objectGridConfig xmlns:xsi="http:
xsi:schemaLocation="http: xmlns="http://ibm.com/ws/objectgrid/config">
<objectGrids>
<objectGrid name="og1">
<backingMap name="stockQuote" readOnly="false" copyKey="true" streamRef="stockQuote"/>
<backingMap name="last5MinuteAvgPrice" readOnly="false" copyKey="false" viewRef="last5MinuteAvgPrice"/>
<streamQuerySet name="stockQuoteSQS">
<stream name="stockQuote" valueClass="com.ibm.ws.objectgrid.sample.guide.StockQuote" >
</stream>
<view name="last5MinuteAvgPrice"
sql="CREATE VIEW last5MinuteAvgPrice AS SELECT issue, avg(price) as totalVolume FROM (SELECT * FROM stockQuote FETCH LATEST 5 MINUTES) group by issue;"/>
</streamQuerySet>
</objectGrid>
</objectGrids>
</objectGridConfig>
Create a deployment policy file to deploy this ObjectGrid as follows:
<?xml version="1.0" encoding="UTF-8"?>
<deploymentPolicy xmlns:xsi="http:
xsi:schemaLocation="http:
xmlns="http:>
<objectgridDeployment objectgridName="og1">
<mapSet name="mapSet1" numberOfPartitions="1" minSyncReplicas="0" maxSyncReplicas="2" maxAsyncReplicas="1">
<map ref="stockQuote"/>
<map ref="last5MinuteAvgPrice"/>
</mapSet>
</objectgridDeployment>
</deploymentPolicy>
This deployment policy has one deployment ObjectGrid og1, which has one map set. This map set has 1 partition, and the minimum replica is 0, which means no replica is required. The mapset contains two maps, stockQuote and last5MinuteAvgPrice, which correspond to the maps defined in ObjectGrid xml file.
As mentioned previoiusly, a stream query set is a deployment unit, that is, all maps defined in a stream query set should be deployed in a map set to guarantee they are deployed in the same server. If maps used in one stream query set are deployed in multiple map set, an error results.
 | Useful Information
Distributed stream queries work for both dynamic and static distributed ObjectGrid model. If you use the static model, that is, you use cluster XML to configure ObjectGrid servers, you can define the same mapset in the cluster.xml. |
Sample code
The distributed stream query application running on the client side is similar to the local stream application shown previousy, with a few difference. Here is the client application DistStreamQueryApp1XML:
package com.ibm.ws.objectgrid.streamquery.sample.guide.dist;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.ibm.websphere.objectgrid.ClientClusterContext;
import com.ibm.websphere.objectgrid.ObjectGrid;
import com.ibm.websphere.objectgrid.ObjectGridManager;
import com.ibm.websphere.objectgrid.ObjectGridManagerFactory;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.config.ObjectGridConfigFactory;
import com.ibm.websphere.objectgrid.config.ObjectGridConfiguration;
import com.ibm.websphere.objectgrid.config.Plugin;
import com.ibm.websphere.objectgrid.config.PluginType;
import com.ibm.ws.objectgrid.streamquery.sample.guide.StockQuote;
public class DistStreamQueryApp1XML {
public static void main(String[] args) throws Exception {
ObjectGridManager ogManager = ObjectGridManagerFactory.getObjectGridManager();
ogManager.setTraceFileName("logs/client.log");
ogManager.setTraceSpecification("ObjectGrid*=all=enabled");
addOGELPlugin(ogManager);
ClientClusterContext ccContext = ogManager.connect("localhost:2809", null, null);
ObjectGrid og = ogManager.getObjectGrid(ccContext, "og1");
Session session = og.getSession();
ObjectMap streamMap = session.getMap("stockQuote");
ObjectMap viewMap = session.getViewMap("last5MinuteAvgPrice");
streamMap.remove("IBM");
streamMap.remove("CSCO");
streamMap.insert("IBM", new StockQuote("IBM", 95.00f, 1000));
streamMap.update("IBM", new StockQuote("IBM", 96.00f, 2000));
streamMap.update("IBM", new StockQuote("IBM", 100.00f, 3000));
streamMap.insert("CSCO", new StockQuote("CSCO", 6.00f, 10000));
streamMap.update("CSCO", new StockQuote("CSCO", 6.20f, 2500));
Thread.sleep(5000);
System.out.println("Exit");
}
/**
* Override the ObjectGridEventListener plugin on the client side
* @param ogManager
*/
private static void addOGELPlugin(ObjectGridManager ogManager) {
Map overrideMap = new HashMap();
List objectGridConfigList = new ArrayList();
ObjectGridConfiguration ogConfig = ObjectGridConfigFactory.createObjectGridConfiguration("og1");
Plugin ogel = ObjectGridConfigFactory.createPlugin(PluginType.OBJECTGRID_EVENT_LISTENER,
"com.ibm.ws.objectgrid.streamquery.sample.guide.dist.SingleViewPrintListener");
ogConfig.addPlugin(ogel);
objectGridConfigList.add(ogConfig);
overrideMap.put("defaultDomain", objectGridConfigList);
ogManager.setOverrideObjectGridConfigurations(overrideMap);
}
}
Initially, it calls addOGELPlugin method to create a SingleViewPrintListener listener and then overrides it as the ObjectGridEventListener plug-in on the client side. The SingleViewPrintListener listener simply prints out the data changes from the server side maps.
It then calls ObjectGridManager.connect to connect to the catalog server. The host name is localhost, and the port number is 2809.
The method Session.getViewMap is a special view map that retrieves the stream query view map on the client side. This tells the server to start replicating the data to the near cache. You have to call this method. Otherwise, the data on the server-side map will not be replicated to the client.
Following is the SingleViewPrintListener class:
package com.ibm.ws.objectgrid.streamquery.sample.guide.dist;
import java.util.Collection;
import java.util.Iterator;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.plugins.LogElement;
import com.ibm.websphere.objectgrid.plugins.LogSequence;
import com.ibm.websphere.objectgrid.plugins.ObjectGridEventListener;
public class SingleViewPrintListener implements ObjectGridEventListener {
private String mName = null;
public SingleViewPrintListener() {
}
public SingleViewPrintListener(String mapName) {
this.mName = mapName;
}
public void destroy() {
}
public void initialize(Session session) {
}
public void transactionBegin(String txid, boolean isWriteThroughEnabled) {
}
public void transactionEnd(String txid, boolean isWriteThroughEnabled, boolean committed, Collection changes) {
Iterator iter = changes.iterator();
while (iter.hasNext()) {
LogSequence logSeq = (LogSequence) iter.next();
if (logSeq.isDirty()) {
Iterator logSeqIter = logSeq.getAllChanges();
String mapName = logSeq.getMapName();
if (mName != null && (!mName.equals(mapName))) {
continue;
}
while (logSeqIter.hasNext()) {
LogElement logElement = (LogElement) logSeqIter.next();
switch (logElement.getType().getCode()) {
case LogElement.CODE_INSERT:
System.out.println(mapName + ": insert " + logElement.getCacheEntry().getKey() + "-->"
+ logElement.getCacheEntry().getCommittedValue());
break;
case LogElement.CODE_UPDATE:
System.out.println(mapName + ": update " + logElement.getCacheEntry().getKey() + "-->"
+ logElement.getCacheEntry().getCommittedValue());
break;
case LogElement.CODE_DELETE:
System.out.println(mapName + ": delete " + logElement.getCacheEntry().getKey() + "-->"
+ logElement.getBeforeImage());
break;
default:
break;
}
}
}
}
}
}
Sample execution
Start the catalog server using the following scripts. See Starting a catalog service to read more about launching these processes.
startOgServer.bat catalogServer
Next, launch a container server using the following script:
startOgServer.bat c0 -objectGridFile ../xml/StreamQueryApp1.xml -deploymentPolicyFile ../xml/StreamQueryDP.xml
-catalogServiceEndpoints localhost:2809
In order to successfully launch the server, you have to add the the jar file which contains the StockQuote and AveragePrice class in the classpath. You can edit startOgServer.bat to do that. See Starting ObjectGrid server processes for more information about launching container servers.
After the server is started, launch the client using the following command:
java -classpath ../lib/ogclient.jar;../lib/ogstreamquery.jar;../applib/sqsample.jar com.ibm.ws.objectgrid.streamquery.sample.guide.dist.DistStreamQueryApp1XML
The output of the program is:
1 stockQuote: insert IBM-->com.ibm.ws.objectgrid.streamquery.sample.guide.StockQuote@11e9b
2 stockQuote: update IBM-->com.ibm.ws.objectgrid.streamquery.sample.guide.StockQuote@12284
3 stockQuote: update IBM-->com.ibm.ws.objectgrid.streamquery.sample.guide.StockQuote@12670
4 stockQuote: insert CSCO-->com.ibm.ws.objectgrid.streamquery.sample.guide.StockQuote@1fdbf2
5 stockQuote: update CSCO-->com.ibm.ws.objectgrid.streamquery.sample.guide.StockQuote@1fbea6
6 last5MinuteAvgPrice: update Tuple{attrs:[IBM], assocs:[none], parid:0, fixed:false, hc=0}-->Tuple{attrs:[95.0], assocs:[
none], parid:-1, fixed:false, hc=0}
7 last5MinuteAvgPrice: update Tuple{attrs:[IBM], assocs:[none], parid:0, fixed:false, hc=0}-->Tuple{attrs:[95.5], assocs:[
none], parid:-1, fixed:false, hc=0}
8 last5MinuteAvgPrice: update Tuple{attrs:[IBM], assocs:[none], parid:0, fixed:false, hc=0}-->Tuple{attrs:[97.0], assocs:[
none], parid:-1, fixed:false, hc=0}
9 last5MinuteAvgPrice: update Tuple{attrs:[CSCO], assocs:[none], parid:0, fixed:false, hc=0}-->Tuple{attrs:[6.0], assocs:[
none], parid:-1, fixed:false, hc=0}
10 last5MinuteAvgPrice: update Tuple{attrs:[CSCO], assocs:[none], parid:0, fixed:false, hc=0}-->Tuple{attrs:[6.1], assocs:[
none], parid:-1, fixed:false, hc=0}
This output prints out all the map changes to ObjectGrid og1. The first word of each line represents the map. You can see the printout includes changes to both stockQuote map and last5MinuteAvgPrice map.
In the view result map last5MinuteAvgPrice, notice that the first record for IBM on line 6 has averagePrice attribute value as 95.0. The reason is that at the time the average prices is calculated, IBM stock only had one price of 95. If you look at the third record on line 8, the average price is 97.0, which is the expected unweighted average price.
You running results might be different. The reason is that messages take time to travel between producers and consumers. Hence, state in views may lag behind the true state. However, the final results should be the same. Refer to Eventual correctness in Stream query engine language tutorial for more detals.
From this example, you can see there is not much difference between a local stream query application and a distributed stream query application from stream query point of view.
Partition consideration
An ObjectGrid map can be partitioned in N servers. When the stream maps used by the stream query are partitioned, the stream query set will be deployed in every partition. Each partition will generate the view results based on the data only from that partition. View results are then replicated to the client side in the near cache.
Since view results of all partitions are replicated to the same near cache, it is important to make sure the keys of the view results in different partitions are different. If this statement cannot hold, then the data from different partitions can override each other in the client near cache.
© Copyright IBM Corporation 2007,2009. All Rights Reserved.