Example 3: Unmanaged MQ subscriber
The unmanaged subscriber is an important class of subscriber application. With it, you combine the benefits of publish/subscribe with control of queuing and consumption of publications. An unmanaged subscription is where the application is responsible. for specifying the queue where the subscriptions are stored. The example demonstrates different ways of combining subscriptions and queues.
The unmanaged pattern is more commonly associated with durable subscriptions than non-durable. Typically the lifecycle of a subscription created by an unmanaged subscriber is independent of the lifecycle of the subscribing application itself. By making the subscription durable the subscription receives publications even when no subscribing application is active.
You can create durable managed subscriptions to achieve the same result, but some applications require more flexibility and control over queues and messages than is possible with a managed subscription. For a durable managed subscription, the queue manager creates a permanent queue for the publications that match the subscription topic. It deletes the queue and associated publications when the subscription is deleted.
Typically durable managed subscriptions are used if the lifecycle of the application and the subscription is essentially the same, but hard to guarantee. By making the subscription durable, and having the publisher create persistent publications, there are no lost messages should the queue manager or subscriber terminate prematurely and need to be recovered.
For non-JMS applications, or JMS applications that are not using a shared subscription, the queue manager will implicitly open the durable managed subscription queue for a subscriber in such a way that shared processing of the queue is not possible. In addition, unless your application is using JMS shared subscriptions, it is not possible to create more than one subscription for each managed queue and you might find the queues harder to manage because you have less control over the names of the queues. For these reasons, consider whether the unmanaged MQ subscriber is a better fit for applications requiring durable subscriptions than the managed MQ subscriber.
- On demand subscriptions: the subscription topic strings are dynamic. They are provided by the application when it runs.
- Simplified subscription topic management: subscription topic management is simplified by defining the root part of the subscription topic string using an administratively defined topic. This hides the root part of the topic tree from the application. By hiding the root part a subscriber can be deployed to different topic trees.
- Flexible subscription management: you can define a subscription either administratively, or create it on-demand in a subscriber program. There is no difference between administratively and programmatically created subscriptions, except an attribute that shows how the subscription was created. There is a third type of subscription that is created automatically by the queue manager for distribution of subscriptions. All subscriptions are displayed in the IBM® MQ Explorer.
- Flexible association of subscriptions with queues: a predefined local queue is associated with a subscription by the
MQSUBfunction. There are different ways to use MQSUB to associate subscriptions with queues:- Associate a subscription with a queue having no existing subscriptions,
MQSO_CREATE + (Hobj from MQOPEN). - Associate a new subscription with a queue having existing subscriptions,
MQSO_CREATE + (Hobj from MQOPEN). - Move a existing subscription to a different queue,
MQSO_ALTER + (Hobj from MQOPEN). - Resume an existing subscription associated with an existing queue,
MQSO_RESUME + (Hobj = MQHO_NONE), orMQSO_RESUME + (Hobj = from MQOPEN of queue with existing subscription). - By combining
MQSO_CREATE | MQSO_RESUME | MQSO_ALTERin different combinations, you can cater for different input states of the subscription and the queue without having to code multiple versions ofMQSUBwith differentsd.Optionsvalues. - Alternatively, by coding a specific choice of
MQSO_CREATE | MQSO_RESUME | MQSO_ALTERthe queue manager returns an error ( Table 1 ) if the states of the subscription and queue provided as input toMQSUBare inconsistent with the value ofsd.Options. Figure 9 shows the results of issuingMQSUBfor Subscription X with different individual settings of thesd.Optionsflag, and passing it three different object handles.
Explore different inputs to the example program in Figure 2 to become familiar with these different kinds of errors. One common error,
RC = 2440, that is not included in the cases listed in the table, is a subscription name error. it is commonly caused by passing a null or invalid subscription name withMQSO_RESUMEorMQSO_ALTER. - Associate a subscription with a queue having no existing subscriptions,
- Multiprocessing: You can share among many consumers the work of reading publications. The publications all go onto the single queue associated with the subscription topic. Consumers have a choice of opening the queue directly using
MQOPENor resuming the subscription usingMQSUB. - Subscription concentration: multiple subscriptions can be created on the same queue. Be cautious with this capability as it can lead to overlapping subscriptions, and receiving the same publication multiple times. The
MQSO_GROUP_SUBoption eliminates duplicate publications caused by overlapping subscriptions. - Subscriber and consumer separation: As well as the three consumer models illustrated in the examples, another model is to separate the consumer from the subscriber. It is a variation of the unmanaged MQ Subscriber, but rather than issue the
MQOPENandMQSUBin the same program, one program subscribes to publications, and another program consumes them. For example, the subscriber might be part of a publish/subscribe cluster and the consumer attached to a queue manager outside the queue manager cluster. The consumer receives publications through standard distributed queuing by defining the subscription queue as a remote queue definition.
MQSO_CREATE | MQSO_RESUME | MQSO_ALTER is important, especially if you plan to simplify your code by using combinations of these options. Study the table Table 1 that shows the results of passing different queue handles to MQSUB, and the results of running the example program shown in Figure 4 to Figure 9.
The scenario used to construct the table has one subscription X and two queues, A and B. The subscription name parameter sd.SubName is set to X, the name of a subscription attached to queue A. Queue B has no subscription attached to it.
MQSUB is passed subscription X and the queue handle to queue A. The results from subscription options are as follows:MQSO_CREATEfails because the queue handle corresponds to the queueAwhich already has a subscription toX. Contrast this behavior to the successful call. That call succeeds because queueBdoes not have a subscription toXattached to it.MQSO_RESUMEsucceeds because the queue handle corresponds to the queueAwhich already has a subscription toX. In contrast, the call fails where the subscriptionXdoes not exist on queueA.MQSO_ALTERbehaves in a similar way toMQSO_RESUMEwith respect to opening the subscription and queue. However if the attributes contained within the subscription descriptor passed toMQSUBdiffer from the attributes of the subscription,MQSO_RESUMEfails, whereasMQSO_ALTERsucceeds as long as the program instance has permission to alter the attributes. Note that you can never change the topic string in a subscription; but rather than return an error,MQSUBignores the topic name and topic string values in the subscription descriptor and uses the values in the existing subscription.
MQSO_CREATEsucceeds and creates subscriptionXon queueBbecause this is a new subscription on queueB.MQSO_RESUMEfails.MQSUBlooks for subscription X on queueBand does not find it, but rather than returning RC = 2428 - subscription X does not exist, it returns RC = 2019 - Subscription queue does not match queue object handle. The behavior of the third optionMQSO_ALTERsuggests the reason for this unexpected error.MQSUBexpects the queue handle to point to a queue with a subscription. It checks this first before checking whether the subscription named insd.SubNameexists.MQSO_ALTERsucceeds, and moves the subscription from queueAto queueB.
A case that is not shown in the table is if the subscription name of the subscription on queue A does not match the subscription name in sd.SubName. That call fails with a RC = 2428 - subscription X does not exist on Queue A.
| Queue handles |
|
|
|---|---|---|
| Hobj for Queue A passed to MQSUB |
|
|
| Hobj for Queue B passed to MQSUB |
|
|
| MQHO_NONE passed to MQSUB |
|
|
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <cmqc.h>
void inquireQname(MQHCONN HConn, MQHOBJ Hobj, MQCHAR48 qName);
int main(int argc, char **argv)
{
MQCHAR48 topicNameDefault = "STOCKS";
char topicStringDefault[] = "IBM/PRICE";
char subscriptionNameDefault[] = "IBMSTOCKPRICESUB";
char subscriptionQueueDefault[] = "STOCKTICKER";
char publicationBuffer[101]; /* Allocate to receive messages */
char resTopicStrBuffer[151]; /* Allocate to resolve topic string */
MQCHAR48 qmName = ""; /* Default queue manager */
MQCHAR48 qName = ""; /* Allocate storage for MQINQ */
MQHCONN Hconn = MQHC_UNUSABLE_HCONN; /* connection handle */
MQHOBJ Hobj = MQHO_NONE; /* subscription queue handle */
MQHOBJ Hsub = MQSO_NONE; /* subscription handle */
MQLONG CompCode = MQCC_OK; /* completion code */
MQLONG Reason = MQRC_NONE; /* reason code */
MQLONG messlen = 0;
MQOD od = {MQOD_DEFAULT}; /* Unmanaged subscription queue */
MQSD sd = {MQSD_DEFAULT}; /* Subscription Descriptor */
MQMD md = {MQMD_DEFAULT}; /* Message Descriptor */
MQGMO gmo = {MQGMO_DEFAULT}; /* get message options */
MQLONG sdOptions = MQSO_CREATE | MQSO_RESUME | MQSO_DURABLE | MQSO_FAIL_IF_QUIESCING;
char * topicName = topicNameDefault;
char * topicString = topicStringDefault;
char * subscriptionName = subscriptionNameDefault;
char * subscriptionQueue = subscriptionQueueDefault;
char * publication = publicationBuffer;
char * resTopicStr = resTopicStrBuffer;
memset(resTopicStrBuffer, 0, sizeof(resTopicStrBuffer));
switch(argc){ /* Replace defaults with args if provided */
default:
switch((argv[5][0])) {
case('A'): sdOptions = MQSO_ALTER | MQSO_DURABLE | MQSO_FAIL_IF_QUIESCING;
break;
case('C'): sdOptions = MQSO_CREATE | MQSO_DURABLE | MQSO_FAIL_IF_QUIESCING;
break;
case('R'): sdOptions = MQSO_RESUME | MQSO_DURABLE | MQSO_FAIL_IF_QUIESCING;
break;
default: ;
}
case(5):
if (strcmp(argv[4],"/")) /* "/" invalid = No subscription */
subscriptionQueue = argv[4];
else {
*subscriptionQueue = '\0';
if (argc > 5) {
if (argv[5][0] == 'C') {
sdOptions = sdOptions + MQSO_MANAGED;
}
}
else
sdOptions = sdOptions + MQSO_MANAGED;
}
case(4):
if (strcmp(argv[3],"/")) /* "/" invalid = No subscription */
subscriptionName = argv[3];
else {
*subscriptionName = '\0';
sdOptions = sdOptions - MQSO_DURABLE;
}
case(3):
if (strcmp(argv[2],"/")) /* "/" invalid = No topic string */
topicString = argv[2];
else
*topicString = '\0';
case(2):
if (strcmp(argv[1],"/")) /* "/" invalid = No topic object */
topicName = argv[1];
else
*topicName = '\0';
case(1):
sd.Options = sdOptions;
printf("Optional parameters: "
printf("topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume)\n");
printf("Values \"%-.48s\" \"%s\" \"%s\" \"%-.48s\" sd.Options=%d\n",
topicName, topicString, subscriptionName, subscriptionQueue, sd.Options);
}
switch((argv[5][0]))- You have the choice of entering
A lter | C reate | R esumein parameter 5, to test the effect of overriding part of theMQSUBoption setting used by default in the example. The default setting used by the example isMQSO_CREATE | MQSO_RESUME | MQSO_DURABLE.Note: SettingMQSO_ALTERorMQSO_RESUMEwithout settingMQSO_DURABLEis an error, andsd.SubNamemust be set and refer to a subscription that can be resumed or altered. *subscriptionQueue = '\0';sdOptions = sdOptions + MQSO_MANAGED;- If the default subscription queue,
STOCKTICKERis replaced by a null string then as long asMQSO_CREATEis set, the example sets theMQSO_MANAGEDflag and creates a dynamic subscription queue. IfAlter or Resumeare set in the fifth parameter the behavior of the example will depend on the value ofsubscriptionName. *subscriptionName = '\0';sdOptions = sdOptions - MQSO_DURABLE;- If the default subscription,
IBMSTOCKPRICESUB, is replaced by a null string then the example removes theMQSO_DURABLEflag. If you run the example providing the default values for the other parameters an additional temporary subscription destined toSTOCKTICKERis created and receives duplicate publications. Next time you run the example, without any parameters, you receive just one publication again.
do {
MQCONN(qmName, &Hconn, &CompCode, &Reason);
if (CompCode != MQCC_OK) break;
if (strlen(subscriptionQueue)) {
strncpy(od.ObjectName, subscriptionQueue, MQ_Q_NAME_LENGTH);
MQOPEN(Hconn, &od, MQOO_INPUT_AS_Q_DEF | MQOO_FAIL_IF_QUIESCING | MQOO_INQUIRE,
&Hobj, &CompCode, &Reason);
if (CompCode != MQCC_OK) break;
}
strncpy(sd.ObjectName, topicName, MQ_TOPIC_NAME_LENGTH);
sd.ObjectString.VSPtr = topicString;
sd.ObjectString.VSLength = MQVS_NULL_TERMINATED;
sd.SubName.VSPtr = subscriptionName;
sd.SubName.VSLength = MQVS_NULL_TERMINATED;
sd.ResObjectString.VSPtr = resTopicStr;
sd.ResObjectString.VSBufSize = sizeof(resTopicStrBuffer)-1;
MQSUB(Hconn, &sd, &Hobj, &Hsub, &CompCode, &Reason);
if (CompCode != MQCC_OK) break;
gmo.Options = MQGMO_WAIT | MQGMO_NO_SYNCPOINT | MQGMO_CONVERT;
gmo.WaitInterval = 10000;
gmo.MatchOptions = MQMO_MATCH_CORREL_ID;
memcpy(md.CorrelId, sd.SubCorrelId, MQ_CORREL_ID_LENGTH);
inquireQname(Hconn, Hobj, qName);
printf("Waiting %d seconds for publications matching \"%s\" from %-0.48s\n",
gmo.WaitInterval/1000, resTopicStr, qName);
do {
memcpy(md.MsgId, MQMI_NONE, sizeof(md.MsgId));
memcpy(md.CorrelId, MQCI_NONE, sizeof(md.CorrelId));
md.Encoding = MQENC_NATIVE;
md.CodedCharSetId = MQCCSI_Q_MGR;
MQGET(Hconn, Hobj, &md, &gmo, sizeof(publication), publication, &messlen, &CompCode, &Reason);
if (Reason == MQRC_NONE)
printf("Received publication \"%s\"\n", publication);
}
while (CompCode == MQCC_OK);
if (CompCode != MQCC_OK && Reason != MQRC_NO_MSG_AVAILABLE) break;
MQCLOSE(Hconn, &Hsub, MQCO_NONE, &CompCode, &Reason);
if (CompCode != MQCC_OK) break;
MQCLOSE(Hconn, &Hobj, MQCO_NONE, &CompCode, &Reason);
if (CompCode != MQCC_OK) break;
MQDISC(&Hconn, &CompCode, &Reason);
} while (0);
printf("Completion code %d and Return code %d\n", CompCode, Reason);
}
void inquireQname(MQHCONN Hconn, MQHOBJ Hobj, MQCHAR48 qName) {
#define _selectors 1
#define _intAttrs 1
MQLONG select[_selectors] = {MQCA_Q_NAME}; /* Array of attribute selectors */
MQLONG intAttrs[_intAttrs]; /* Array of integer attributes */
MQLONG CompCode, Reason;
MQINQ(Hconn, Hobj, _selectors, select, _intAttrs, intAttrs, MQ_Q_NAME_LENGTH, qName, &CompCode, &Reason);
if (CompCode != MQCC_OK) {
printf("MQINQ failed with Condition code %d and Reason %d\n", CompCode, Reason);
strncpy(qName, "unknown queue", MQ_Q_NAME_LENGTH);
}
return;
}
if (strlen(subscriptionQueue))- If there is no subscription queue name then the example uses
MQHO_NONEas the value ofHobj. MQOPEN(...);- The subscription queue is opened and the queue handle saved in
Hobj. MQSUB(Hconn, &sd, &Hobj, &Hsub, &CompCode, &Reason);- The subscription is opened using the
Hobjpassed fromMQOPEN(orMQHO_NONEif there is no subscription queue name). An unmanaged queue can be resumed without explicitly opening it with anMQOPEN. MQCLOSE(Hconn, &Hsub, MQCO_NONE, &CompCode, &Reason);- The subscription is closed using the subscription handle. Depending on whether the subscription is durable or not, the subscription is closed with an implicit
MQCO_KEEP_SUBorMQCO_REMOVE_SUB. You can close a durable subscription withMQCO_REMOVE_SUB, but you cannot close a non-durable subscription withMQCO_KEEP_SUB. The action ofMQCO_REMOVE_SUBis to remove the subscription which stops any further publications being sent to the subscription queue. MQCLOSE(Hconn, &Hobj, MQCO_NONE, &CompCode, &Reason);- No special action is taken if the subscription is unmanaged. If the queue is managed and the subscription closed with either an explicit or implicit
MQCO_REMOVE_SUB, then all publications are purged from the queue and queue deleted at this point. gmo.MatchOptions = MQMO_MATCH_CORREL_ID;memcpy(md.CorrelId, sd.SubCorrelId, MQ_CORREL_ID_LENGTH);- Ensure that the messages received are those for our subscription.
Results from the example illustrate aspects of publish/subscribe:
130 on the NYSE/IBM/PRICE topic.W:\Subscribe3\Debug>..\..\Publish2\Debug\publishstock
Provide parameters: TopicObject TopicString Publication
Publish "130" to topic "STOCKS" and topic string "IBM/PRICE"
Published "130" to topic string "NYSE/IBM/PRICE"
Completion code 0 and Return code 0
130. The provided topic object and topic string are ignored, as shown in Figure 9. The topic object and topic string are always taken from the subscription object, when one is provided, and the topic string is immutable. The actual behavior of the example depends on the choice or combination of MQSO_CREATE, MQSO_RESUME, and MQSO_ALTER. In this example MQSO_RESUME is the option selected.W:\Subscribe3\Debug>solution3
Optional parameters: topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume)
Values "STOCKS" "IBM/PRICE" "IBMSTOCKPRICESUB" "STOCKTICKER" sd.Options=8206
Waiting 10 seconds for publications matching "NYSE/IBM/PRICE" from STOCKTICKER
Received publication "130"
Completion code 0 and Return code 0
MQSUB.
2038 error from MQINQ is due to the implicit MQOPEN of STOCKTICKER by MQSUB not including the MQOO_INQUIRE option. Avoid the 2038 return code from MQINQ by opening the queue explicitly.
W:\Subscribe3\Debug>solution3 STOCKS IBM/PRICE IBMSTOCKPRICESUB / Resume
Optional parameters: topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume)
Values "STOCKS" "IBM/PRICE" "IBMSTOCKPRICESUB" "" sd.Options=8204
MQINQ failed with Condition code 2 and Reason 2038
Waiting 10 seconds for publications matching "NYSE/IBM/PRICE" from unknown queue
Completion code 0 and Return code 0
W:\Subscribe3\Debug>solution3 STOCKS IBM/PRICE / STOCKTICKER Create
Optional parameters: topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume)
Values "STOCKS" "IBM/PRICE" "" "STOCKTICKER" sd.Options=8194
Waiting 10 seconds for publications matching "NYSE/IBM/PRICE" from STOCKTICKER
Received publication "130"
Completion code 0 and Return code 0
IBMSTOCKPRICESUB subscription that is still active on the STOCKTICKER queue. The example is an illustration that it is the queue that has subscriptions, and not the application. Despite not referring to the IBMSTOCKPRICESUB subscription in this invocation of the application, the application receives the publication twice: once from the durable subscription that was created administratively, and once from the non-durable subscription created by the application itself.
W:\Subscribe3\Debug>..\..\Publish2\Debug\publishstock
Provide parameters: TopicObject TopicString Publication
Publish "130" to topic "STOCKS" and topic string "IBM/PRICE"
Published "130" to topic string "NYSE/IBM/PRICE"
Completion code 0 and Return code 0
W:\Subscribe3\Debug>solution3 STOCKS IBM/PRICE / STOCKTICKER Create
Optional parameters: topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume)
Values "STOCKS" "IBM/PRICE" "" "STOCKTICKER" sd.Options=8194
Waiting 10 seconds for publications matching "NYSE/IBM/PRICE" from STOCKTICKER
Received publication "130"
Received publication "130"
Completion code 0 and Return code 0
- In the first case, Resume resumes the existing subscription, as you might expect, and ignores the changed topic string.
- In the second case, Alter causes an error,
RC = 2510, Topic not alterable. - In the third example, Create causes an error
RC = 2432, Sub already exists.
W:\Subscribe3\Debug>solution3 "" NASDAC/IBM/PRICE IBMSTOCKPRICESUB STOCKTICKER Resume
Optional parameters: topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume)
Values "" "NASDAC/IBM/PRICE" "IBMSTOCKPRICESUB" "STOCKTICKER" sd.Options=8204
Waiting 10 seconds for publications matching "NYSE/IBM/PRICE" from STOCKTICKER
Received publication "130"
Completion code 0 and Return code 0
W:\Subscribe3\Debug>solution3 "" NASDAC/IBM/PRICE IBMSTOCKPRICESUB STOCKTICKER Alter
Optional parameters: topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume)
Values "" "NASDAC/IBM/PRICE" "IBMSTOCKPRICESUB" "STOCKTICKER" sd.Options=8201
Completion code 2 and Return code 2510
W:\Subscribe3\Debug>solution3 "" NASDAC/IBM/PRICE IBMSTOCKPRICESUB STOCKTICKER Create
Optional parameters: topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume)
Values "" "NASDAC/IBM/PRICE" "IBMSTOCKPRICESUB" "STOCKTICKER" sd.Options=8202
Completion code 2 and Return code 2432