Maps as FIFO queues
With WebSphere® eXtreme Scale, you can provide a first-in first-out (FIFO) queue-like capability for all maps. WebSphere eXtreme Scale tracks the insertion order for all maps. A client can ask a map for the next unlocked entry in a map in the order of insertion and lock the entry. This process allows multiple clients to consume entries from the map efficiently.
FIFO example
The following code snippet shows a client entering a loop to process entries from the map until the map is exhausted. The loop starts a transaction, then calls the ObjectMap.getNextKey(5000) method. This method returns the key of the next available unlocked entry and locks it. If the transaction is blocked for more than 5000 milliseconds, then the method returns null.Session session = ...;
ObjectMap map = session.getMap("xxx");
// this needs to be set somewhere to stop this loop
boolean timeToStop = false;
while(!timeToStop)
{
session.begin();
Object msgKey = map.getNextKey(5000);
if(msgKey == null)
{
// current partition is exhausted, call it again in
// a new transaction to move to next partition
session.rollback();
continue;
}
Message m = (Message)map.get(msgKey);
// now consume the message
...
// need to remove it
map.remove(msgKey);
session.commit();
}
Local mode versus client mode
If the application is using a local core, that is, it is not a client, then the mechanism works as described previously.For client mode, if the Java™ virtual machine (JVM) is a client, then the client initially connects to a random partition primary. If no work exists in that partition, then the client moves to the next partition to look for work. The client either finds a partition with entries or loops around to the initial random partition. If the client loops around to the initial partition, then it returns a null value to the application. If the client finds a partition with a map that has entries, then it consumes entries from there until no entries are available for the timeout period. After the timeout passes, then null is returned. This action means that when null is returned and a partitioned map is used, then it you should start a new transaction and resume listening. The previous code sample fragment has this behavior.
Example
Job.java
package tutorial.fifo;
import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;
@Entity
public class Job
{
@Id String jobId;
int jobState;
}
JobEvent.java
package tutorial.fifo;
import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;
import com.ibm.websphere.projector.annotations.OneToOne;
@Entity
public class JobEvent
{
@Id String eventId;
@OneToOne Job job;
}
The
job has as ID and state, which is an integer. Suppose you want to increment the state whenever an
event arrived. The events are stored in the JobEvent Map. Each entry has a reference to the job the
event concerns. The code for the listener to do this looks like the following
example:JobEventListener.java
package tutorial.fifo;
import com.ibm.websphere.objectgrid.ObjectGridException;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.em.EntityManager;
public class JobEventListener
{
boolean stopListening;
public synchronized void stopListening()
{
stopListening = true;
}
synchronized boolean isStopped()
{
return stopListening;
}
public void processJobEvents(Session session)
throws ObjectGridException
{
EntityManager em = session.getEntityManager();
ObjectMap jobEvents = session.getMap("JobEvent");
while(!isStopped())
{
em.getTransaction().begin();
Object jobEventKey = jobEvents.getNextKey(5000);
if(jobEventKey == null)
{
em.getTransaction().rollback();
continue;
}
JobEvent event = (JobEvent)em.find(JobEvent.class, jobEventKey);
// process the event, here we just increment the
// job state
event.job.jobState++;
em.getTransaction().commit();
}
}
}
The listener is started on a thread by the application. The listener runs until the stopListening method is called. The processJobEvents method is run on the thread until the stopListening method is called. The loop blocks waiting for an eventKey from the JobEvent Map and then uses the EntityManager to access the event object, dereference to the job and increment the state.
The EntityManager API does not have a getNextKey method, but the ObjectMap does. So, the code uses the ObjectMap for JobEvent to get the key. If a map is used with entities then it does not store objects anymore. Instead, it stores Tuples; a Tuple object for the key and a Tuple object for the value. The EntityManager.find method accepts a Tuple for the key.
em.getTransaction().begin();
Job job = em.find(Job.class, "Job Key");
JobEvent event = new JobEvent();
event.id = Random.toString();
event.job = job;
em.persist(event); // insert it
em.getTransaction().commit();
You
find the job for the event, construct an event, point it to the job, insert it in the JobEvent Map
and commit the transaction.