See the WebSphere eXtreme Scale Wiki for links to eXtreme Scale Version 7.0 documentation.
If you log in
with your developerWorks ID, you can leave comments and feedback for the development team.
ObjectGrid has the ability to provide a First-in First out (FIFO) queue-like capability for all maps. ObjectGrid tracks the insertion order for all maps. A client can ask a map for the next unlocked entry in a map in the order of insertion and lock it. This allows multiple clients to consume entries from the map efficiently. Following is a code fragment showing how this capability is used:
Session session = ...;
ObjectMap map = session.getMap("xxx");
boolean timeToStop = false;
while(!timeToStop)
{
session.begin();
Object msgKey = map.getNextKey(5000);
if(msgKey == null)
{
session.rollback();
continue;
}
Message m = (Message)ma.get(msgKey);
...
map.remove(msgKey);
session.commit();
}
This fragment shows a client entering a loop to process entries from the map until it is exhausted. The loop starts a transaction, then calls ObjectMap.getNextKey(5000). This returns the key of the next available unlocked entry and locks it. If it blocks for more than 5000 milliseconds then it returns null.
Local mode
If the application is using a local core, that is, it is not a client, then the mechanism works as described previously.
Client mode
If the Java Virtual Machine (JVM) is a client then the client initially connects to a random partition primary. If no work is there then the client moves to the next partition to look. for work. It will either find a partition with entries or loop around to the initial random partition. If it loops around, then it returns null to the application. If it finds a partition whose map has entries, then it consumes entries from there until no entries are available for the timeout period. Once the timeout passes then null is returned. This means that when null is returned and a partitioned map is used, then it is prudent to start a new transaction and resume listening. The previous code sample fragment has this behavior.
Example
When running as a client and a key is returned, that transaction is now bound to the partition with the entry for that key. If you do not want to update any other maps during that transaction, then there is not a problem. If you do want to update, then you can only update maps from the same partition as the map from which you got the key.
So, the entry returned from getNextKey needs to give the application a way to find relevant data in that partition. As an example, if you have two maps; one for events and another for jobs that the events impact. You define the two maps with the following entities:
package tutorial.fifo;
import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;
@Entity
public class Job
{
@Id String jobId;
int jobState;
}
package tutorial.fifo;
import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;
import com.ibm.websphere.projector.annotations.OneToOne;
@Entity
public class JobEvent
{
@Id String eventId;
@OneToOne Job job;
}
The job has as ID and state which is an integer. Suppose you want to increment the state whenever an event arrived. The events are stored in the JobEvent Map. Each entry has a reference to the job the event concerns. The code for the listener to do this looks like the following:
package tutorial.fifo;
import com.ibm.websphere.objectgrid.ObjectGridException;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.em.EntityManager;
public class JobEventListener
{
boolean stopListening;
public synchronized void stopListening()
{
stopListening = true;
}
synchronized boolean isStopped()
{
return stopListening;
}
public void processJobEvents(Session session)
throws ObjectGridException
{
EntityManager em = session.getEntityManager();
ObjectMap jobEvents = session.getMap("JobEvent");
while(!isStopped())
{
em.getTransaction().begin();
Object jobEventKey = jobEvents.getNextKey(5000);
if(jobEventKey == null)
{
em.getTransaction().rollback();
continue;
}
JobEvent event = (JobEvent)em.find(JobEvent.class, jobEventKey);
event.job.jobState++;
em.getTransaction().commit();
}
}
}
The listener is started on a thread by the application. It runs until someone calls the stopListening method. The processJobEvents is run on the thread until this happens. The loop blocks waiting for an eventKey from the JobEvent Map and then uses the EntityManager to access the event object, dereference to the job and increment the state.
There is some non obvious things happening here. The EntityManager API does not have a getNextKey method. ObjectMap does. So, the code uses the ObjectMap for JobEvent to get the key. If a map is used with entities then it does not store objects anymore. Instead, it stores Tuples; a Tuple object for the key and a Tuple object for the value. The EntityManager.find method accepts a Tuple for the key.
The code to create an event looks like the following:
em.getTransaction().begin();
Job job = em.find(Job.class, "Job Key");
JobEvent event = new JobEvent();
event.id = Random.toString();
event.job = job;
em.persist(event); em.getTransaction().commit();
You find the job for the event, construct an event, point it to the job, insert it in the JobEvent Map and commit the transaction.
Loaders and FIFO Maps
If you want to back a map used as a FIFO queue with a Loader, then you might need to do some additional work. If the order of the entries in the map is not a concern, then there is no extra work. If the order is important, then you need to add a sequence number to all inserted records when persisting them to the backend. The preload mechanism should be written to insert the records on startup using this order.
Additional information
© Copyright IBM Corporation 2007,2009. All Rights Reserved.