Using shared subscriptions in JMS 2.0
JMS 2.0 introduces the concept of
shared subscriptions, where a single subscription is shared among multiple
consumers, with only one of the consumers receiving a publication at any point in time. IBM® MQ classes for JMS.
When you are developing a JMS application for IBM MQ 8.0 or later, you might need to consider the impact of this functionality on your queue manager.
The idea behind shared subscriptions is basically to share the load among multiple consumers. A durable subscription can also be shared among multiple consumers.
- Subscription
SUB, subscribing to a topic FIFA2014/UPDATES to receive football match updates, being shared by three consumersC1,C2, andC3 - Producer
P1publishing on the FIFA2014/UPDATES topic
When a publication is made on FIFA2014/UPDATES, the publication will be received by only one of
the three consumers (C1, or C2, or C3) but not
all.
The following sample demonstrates the usage of shared subscriptions, and also demonstrates the
usage of the additional API in JMS 2.0, Message.receiveBody(), to retrieve only
the message body.
The sample creates three subscriber threads, which create a shared subscription to the FIFA2014/UPDATES topic, and one publisher thread.
package mqv91Samples;
import javax.jms.JMSException;
import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.jms.JmsFactoryFactory;
import com.ibm.msg.client.wmq.WMQConstants;
import javax.jms.JMSContext;
import javax.jms.Topic;
import javax.jms.Queue;
import javax.jms.JMSConsumer;
import javax.jms.Message;
import javax.jms.JMSProducer;
/*
* Implements both Subscriber and Publisher
*/
class SharedNonDurableSubscriberAndPublisher implements Runnable {
private Thread t;
private String threadName;
SharedNonDurableSubscriberAndPublisher( String name){
threadName = name;
System.out.println("Creating Thread:" + threadName );
}
/*
* Demonstrates shared non-durable subscription in JMS 2.0
*/
private void sharedNonDurableSubscriptionDemo(){
JmsConnectionFactory cf = null;
JMSContext msgContext = null;
try {
// Create Factory for WMQ JMS provider
JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
// Create connection factory
cf = ff.createConnectionFactory();
// Set MQ properties
cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, "QM3");
cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_BINDINGS);
// Create message context
msgContext = cf.createContext();
// Create a topic destination
Topic fifaScores = msgContext.createTopic("/FIFA2014/UPDATES");
// Create a consumer. Subscription name specified, required for sharing of subscription.
JMSConsumer msgCons = msgContext.createSharedConsumer(fifaScores, "FIFA2014SUBID");
// Loop around to receive publications
while(true){
String msgBody=null;
// Use JMS 2.0 receiveBody method as we are interested in message body only.
msgBody = msgCons.receiveBody(String.class);
if(msgBody != null){
System.out.println(threadName + " : " + msgBody);
}
}
}catch(JMSException jmsEx){
System.out.println(jmsEx);
}
}
/*
* Publisher publishes match updates like current attendance in the stadium, goal score and ball possession by teams.
*/
private void matchUpdatePublisher(){
JmsConnectionFactory cf = null;
JMSContext msgContext = null;
int nederlandsGoals = 0;
int chileGoals = 0;
int stadiumAttendence = 23231;
int switchIndex = 0;
String msgBody = "";
int nederlandsHolding = 60;
int chileHolding = 40;
try {
// Create Factory for WMQ JMS provider
JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
// Create connection factory
cf = ff.createConnectionFactory();
// Set MQ properties
cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, "QM3");
cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_BINDINGS);
// Create message context
msgContext = cf.createContext();
// Create a topic destination
Topic fifaScores = msgContext.createTopic("/FIFA2014/UPDATES");
// Create publisher to publish updates from stadium
JMSProducer msgProducer = msgContext.createProducer();
while(true){
// Send match updates
switch(switchIndex){
// Attendance
case 0:
msgBody ="Stadium Attendence " + stadiumAttendence;
stadiumAttendence += 314;
break;
// Goals
case 1:
msgBody ="SCORE: The Netherlands: " + nederlandsGoals + " - Chile:" + chileGoals;
break;
// Ball possession percentage
case 2:
msgBody ="Ball possession: The Netherlands: " + nederlandsHolding + "% - Chile: " + chileHolding + "%";
if((nederlandsHolding > 60) && (nederlandsHolding < 70)){
nederlandsHolding -= 2;
chileHolding += 2;
}else{
nederlandsHolding += 2;
chileHolding -= 2;
}
break;
}
// Publish and wait for two seconds to publish next update
msgProducer.send (fifaScores, msgBody);
try{
Thread.sleep(2000);
}catch(InterruptedException iex){
}
// Increment and reset the index if greater than 2
switchIndex++;
if(switchIndex > 2)
switchIndex = 0;
}
}catch(JMSException jmsEx){
System.out.println(jmsEx);
}
}
/*
* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
public void run() {
// If this is a publisher thread
if(threadName == "PUBLISHER"){
matchUpdatePublisher();
}else{
// Create subscription and start receiving publications
sharedNonDurableSubscriptionDemo();
}
}
// Start thread
public void start (){
System.out.println("Starting " + threadName );
if (t == null)
{
t = new Thread (this, threadName);
t.start ();
}
}
}
/*
* Demonstrate JMS 2.0 Simplified API using IBM MQ v91 JMS Implementation
*/
public class Mqv91jms2Sample {
public static void main(String[] args) {
// TODO Auto-generated method stub
// Create first subscriber and start
SharedNonDurableSubscriberAndPublisher subOne = new SharedNonDurableSubscriberAndPublisher( "SUB1");
subOne.start();
// Create second subscriber and start
SharedNonDurableSubscriberAndPublisher subTwo = new SharedNonDurableSubscriberAndPublisher( "SUB2");
subTwo.start();
// Create third subscriber and start
SharedNonDurableSubscriberAndPublisher subThree = new SharedNonDurableSubscriberAndPublisher( "SUB3");
subThree.start();
// Create publisher and start
SharedNonDurableSubscriberAndPublisher publisher = new SharedNonDurableSubscriberAndPublisher( "PUBLISHER");
publisher.start();
}
}