Заполнение MQ-заголовков в WebSphere Enterprise Service Bus с использованием специализированного механизма MQ SendExit

Технология связывания WebSphere MQ, предоставляемая с WebSphere ESB и WebSphere Process Server, позволяет встроенным MQ-приложениям обмениваться данными со средой Service Component Architecture (SCA). Технология WebSphere MQ Bindings (MQ-связывания) упрощает передачу и прием MQ-сообщений, но в WebSphere Integration Developer поле MQ-заголовка нельзя заполнить при помощи MQ-связываний. В этой статье рассматривается использование специализированного механизма MQ SendExit для заполнения MQ-заголовков, необходимых встроенным MQ-приложениям, и приводится пример исходного кода для установки AppIdentityData в MQ-заголовке.

Цзюнь Шэнь, старший консультант, группа WebSphere Lab Services, IBM

Цзюнь Шэнь (Jun Shen) – фотографияЦзюнь Шэнь (Jun Shen) работает старшим консультантом в группе WebSphere Lab Services. Специализируется на WebSphere Process Server и реализациях SOA. Имеет 10-летний опыт работы в ИТ-отрасли в Китае и Австралии. В 1999 году получил степень доктора философии по вычислительной технике в Университете Цинхуа в КНР. Связаться с ним можно по адресу junshen@au1.ibm.com.



14.12.2012

Введение

Технология IBM® WebSphere MQ Bindings в IBM WebSphere ESB позволяет передавать и принимать сообщения посредством WebSphere MQ. MQ-связывание автоматически добавляет дескриптор MQ Message Descriptor (MQMD) во все сообщения, в которых он отсутствует, но не изменяет существующий MQMD. Для создания и изменения MQ-заголовков в сообщении можно использовать примитив MQHeaderSetter, но в WebSphere ESB V6.2 нельзя заполнить некоторые поля, в том числе:

  • UserIdentifier
  • AppIdentityData
  • PutApplType
  • PutApplName
  • ApplOriginData

Хотя в WebSphere ESB V7 или более новой версии поля MQMD можно заполнить путем установки специального свойства MDCTX=SET_ALL_CONTEXT в определении JNDI-очереди, в данной статье описывается специализированный механизм MQ SendExit, предназначенный для решения этой проблемы в WebSphere ESB V6.2. В статье используются следующие продукты:

  • WebSphere Integration Developer V6.2.0.2.
  • WebSphere ESB V6.2.0.2 Integrated Test Environment (поставляемый с WebSphere Integration Developer).
  • WebSphere MQ V6.0.2.9.

Заполнение MQMD с использованием специализированного механизма MQ SendExit

Рассматриваемый пример проекта демонстрирует создание специализированного механизма MQ SendExit для заполнения AppIdentityData в MQMD:

Создание модуля

Прежде всего создайте тестовый модуль MQ-связывания:

  1. Создайте новый модуль под названием MQHeaderTest. Флажок Create mediation component должен быть отмечен.
  2. Откройте Dependencies для библиотеки, отметьте флажок Schema for simple JMS Data Bindings рядом с Predefined Resources и нажмите кнопку Save. Теперь доступны типы JMS-тела.
  3. В модуле создайте новый интерфейс под именем MQHeaderTestService.
  4. Создайте однонаправленную операцию под именем sendMessage.
  5. Измените тип input1 на JMSTextBody и нажмите кнопку Save для сохранения интерфейса.
    Рисунок 1. Интерфейс MQHeaderTestService
    Рисунок 1. Интерфейс MQHeaderTestService
  6. Создайте бизнес-объект под именем MQTestMessage.
    Рисунок 2. Бизнес-объект MQTestMessage
    Рисунок 2. Бизнес-объект MQTestMessage
  7. Создайте еще один интерфейс под именем TestRequestService.
    Рисунок 3. Интерфейс TestRequestService
    Рисунок 3. Интерфейс TestRequestService
  8. Щелкните правой кнопкой мыши на компоненте-посреднике, выберите Add: Interface, а затем TestRequestService.
  9. Добавьте Import в сборочную диаграмму.
  10. Соедините компонент-посредник с компонентом Import и при запросе о добавлении интерфейса выберите MQHeaderTestService.
  11. Щелкните правой кнопкой мыши на Import и выберите Generate Binding => Messaging Binding => MQ Binding.
  12. Укажите JNDI-имя для фабрики MQ-соединений и очереди, а затем нажмите кнопку OK.
    Рисунок 4. MQ-связывание
    Рисунок 4. MQ-связывание

    Сборочная диаграмма показана на рисунке 5.

    Рисунок 5. Сборочная диаграмма MQHeaderTest
    Рисунок 5. Сборочная диаграмма MQHeaderTest

Создание потока-посредника

Затем создайте поток-посредник (mediation flow) для преобразования значения appIdentityData и полезной нагрузки в бизнес-объекте MQTestMessage в строку, структура которой приведена ниже. Назначение данной операции состоит в передаче значения appIdentityData в специализированный MQ SendExit через содержимое MQ-сообщения.

Рисунок 6. Структура строки MQTestMessage
Рисунок 6. Структура строки MQTestMessage
  1. Дважды щелкните левой кнопкой мыши на компоненте-посреднике MQHeaderTest для генерирования реализации.
  2. В редакторе Mediation Flow Editor соедините request с sendMessage для создания потока.
  3. В Palette выберите Transformation => Business Object Map.
  4. Добавьте специализированный посредник в рабочую область.
  5. Соедините узел input с входом специализированного посредника.
  6. Соедините выход специализированного посредника с узлом callout. Поток-посредник должен выглядеть примерно так, как показано на рисунке 7.
    Рисунок 7. Поток-посредник MQHeaderTest
    Рисунок 7. Поток-посредник MQHeaderTest
  7. Выберите BOMapper1 для реализации отображения.
    Рисунок 8. Реализация BOMapper1
    Рисунок 8. Реализация BOMapper1
  8. Реализуйте специализированное отображение при помощи кода, приведенного в листинге 1.
    Листинг 1. Код специализированного отображения для BOMapper1
    //получить длину appIdentityData
    String appIDLength = 
     StringUtils.getHexString(((String)requestRequestMsg_request_input_appIdentityData) 
    .length());
    
    sendMessageRequestMsg_sendMessage_input1_value = appIDLength + 
        (String)requestRequestMsg_request_input_appIdentityData + 
        (String)requestRequestMsg_request_input_payload;

Создание специализированного MQ SendExit

Для реализации MQ SendExit создайте новую библиотеку под именем MQExitLib и новый Java-класс под именем MQExitImp в пакете com.ibm.test.mqexit.

Листинг 2. Специализированный MQ SendExit
package com.ibm.test.mqexit;

import java.io.ByteArrayOutputStream;

import com.ibm.mq.MQC;
import com.ibm.mq.MQChannelDefinition;
import com.ibm.mq.MQChannelExit;
import com.ibm.mq.data.MQDataOutputStream;
import com.ibm.ws.sca.internal.mq.exit.MQInternalSendExitImpl;

public class MQExitImp extends MQInternalSendExitImpl {
    
    public static final int mqQueueType = 131;
    public static final int queueControlOffset = 212;
    public static final int applIDControlOffset = 416;
    public static final int applIDOffset = 284;
    public static final int dataLengthOffset = 536;
    public static final int dataOffset = 540;

    public MQExitImp(String version) {
        super("Centrelink MQ SendExit implementation");
    }

    /**
     * 
     * Формат данных: AppIDLength(2 символа) + AppID + AppData
     */
    public byte[] sendExit(MQChannelExit exit, MQChannelDefinition chl,
            byte[] buf) {

        byte[] msg = super.sendExit(exit, chl, buf);

        if (exit.exitReason == MQChannelExit.MQXR_XMIT) {

            //отправить данные
            if (byteToInt(buf[controlFlag2]) == mqputRequest) {
                int controlFlag = byteToInt(buf[controlFlag3]);
                if ((controlFlag == firstSegment)||(controlFlag == 
onlySegment)) {

                    System.out.println("> Queue data input = " + 
StringUtils.getHexString(msg));
                    System.out.println("> remoteUserId = " + 
chl.remoteUserId);

                    //обработать данные сообщения
                    boolean reversed = (byteToInt(msg[controlFlag1]) == 
mqFapReversed);
                    int ccsid = readShortInt(msg, mqFapCcsidOffset, 
    reversed);

                    int encoding = readInt(msg, mqFapMqEncOffset,
                            reversed);

                    //установить IDENTITY_CONTEXT
                    int exitOptions = readInt(msg, applIDControlOffset, 
    reversed);
                    System.out.println("> Original Options = " + 
exitOptions);

                    if ((exitOptions & MQC.MQPMO_PASS_ALL_CONTEXT) == 
MQC.MQPMO_PASS_ALL_CONTEXT) return msg; 

                    ByteArrayOutputStream baos = null;
                    MQDataOutputStream mqdos = null;
                    try {
                        //установить AppID
                        String appIdLenStr = new String(msg, 
dataOffset, 2);
                        int appIDLen = 0;
                        try {
                            appIDLen = 
Integer.parseInt(appIdLenStr);
                        } catch (Exception ex) {
                            System.out.println("> appIDLen 
exception =  " + ex.getMessage());
                            return msg;
                        }
                        System.out.println("> appIDLen = " + 
appIDLen);

                        //установить параметры
                        baos = new ByteArrayOutputStream();
                        mqdos = new MQDataOutputStream(baos);
                        mqdos.setEncoding(encoding);
                        mqdos.setCCSID(ccsid);
                        mqdos.writeMQLONG(exitOptions | 
MQC.MQPMO_SET_IDENTITY_CONTEXT);

                        byte[] valBytes = baos.toByteArray();

                        System.out.println("> New Options = " + 
StringUtils.getHexString(valBytes));

                        System.arraycopy(valBytes, 0, msg,
                                applIDControlOffset,
                                valBytes.length);

                        //установить AppID
                        String appID = new String(msg, dataOffset+2, 
appIDLen);

                        valBytes = appID.getBytes();

                        System.out.println("> AppID = "   + 
StringUtils.getHexString(valBytes));

                        System.arraycopy(valBytes, 0, msg,
                                applIDOffset,
                                valBytes.length);

                        //установить длину AppData
                        int dataLength = readInt(msg, 
dataLengthOffset,   reversed);
                        System.out.println("> Original dataLength 
= " + dataLength);

                        baos = new ByteArrayOutputStream();
                        mqdos = new MQDataOutputStream(baos);
                        mqdos.setEncoding(encoding);
                        mqdos.setCCSID(ccsid);
                        mqdos.writeMQLONG(dataLength - appIDLen -2);

                        valBytes = baos.toByteArray();

                        System.out.println("> New dataLength = " 
+ StringUtils.getHexString(valBytes));

                        System.arraycopy(valBytes, 0, msg,
                                dataLengthOffset,
                                valBytes.length);

                        //установить AppData
                        byte[] newMsg = new byte[msg.length - 
appIDLen -2];
                        //скопировать заголовок
                        System.arraycopy(msg, 0, newMsg,
                                0,
                                dataOffset);

                        //скопировать данные
                        System.arraycopy(msg, dataOffset + appIDLen + 
2, newMsg,
dataOffset,
                                dataLength - appIDLen -2);

                        System.out.println("> Queue data output = 
"   + StringUtils.getHexString(newMsg));

                        return newMsg;

                    } catch (Exception e) {
                        e.printStackTrace(System.out);
                    } finally {
                        if (mqdos != null) {
                            try {
                                mqdos.close();
                            } catch(Exception e1) {}
                        }
                        if (baos != null) {
                            try {
                                baos.close();
                            } catch(Exception e1) {}
                        }
                    }
                }
            } else    if (byteToInt(buf[controlFlag2]) == mqQueueType) {
                System.out.println("> Queue connection input = " +
StringUtils.getHexString(msg));
                System.out.println("> remoteUserId = " + 
chl.remoteUserId);

                //обработать элемент управления queue
                boolean reversed = (byteToInt(msg[controlFlag1]) == 
mqFapReversed);
                int ccsid = readShortInt(msg, mqFapCcsidOffset,
                        reversed);

                int encoding = readInt(msg, mqFapMqEncOffset,
                        reversed);

                int exitOptions = readInt(msg, queueControlOffset,
                        reversed);

                System.out.println("> Original Options = " + 
exitOptions);
                if ((exitOptions & MQC.MQOO_PASS_ALL_CONTEXT) == 
MQC.MQOO_PASS_ALL_CONTEXT) return msg; 

                if ((exitOptions & MQC.MQOO_OUTPUT) == MQC.MQOO_OUTPUT) {
                    ByteArrayOutputStream baos = null;
                    MQDataOutputStream mqdos = null;
                    try {
                        baos = new ByteArrayOutputStream();
                        mqdos = new MQDataOutputStream(baos);
                        mqdos.setEncoding(encoding);
                        mqdos.setCCSID(ccsid);
                        mqdos.writeMQLONG(exitOptions | 
MQC.MQOO_SET_IDENTITY_CONTEXT);

                        byte[] valBytes = baos.toByteArray();

                        System.out.println("> New Options = " + 
StringUtils.getHexString(valBytes));

                        System.arraycopy(valBytes, 0, msg,
                                queueControlOffset,
                                valBytes.length);

                        System.out.println("> Queue connection 
output = " + StringUtils.getHexString(msg));

                    } catch (Exception e) {
                        e.printStackTrace(System.out);
                    } finally {
                        if (mqdos != null) {
                            try {
                                mqdos.close();
                            } catch(Exception e1) {}
                        }
                        if (baos != null) {
                            try {
                                baos.close();
                            } catch(Exception e1) {}
                        }
                    }
                }
            }
        }
        return msg;
    }
}

Специализированный класс SendExit расширяет com.ibm.ws.sca.internal.mq.exit.MQInternalSendExitImpl. Измените два оператора ветвления для изменения буфера MQ-сообщений:

  1. Если сообщение имеет тип mqQueue, установите MQOO_SET_IDENTITY_CONTEXT для параметров очереди.
  2. Если сообщение имеет тип mqputRequest, установите MQPMO_SET_IDENTITY_CONTEXT для параметров очереди. Значение appIdentityData будет извлекаться из содержимого сообщения и выдаваться в буфер сообщений. Значение appIdentityData будет удаляться из содержимого сообщения.

Развертывание специализированного MQ SendExit

Установка JAR-файла SendExit

  1. Выполните экспорт Java-проекта в JAR-файл.
  2. Скопируйте JAR-файл в каталог <WESB_HOME>/lib/ext, где WESB_HOME – это каталог установки WebSphere ESB.

Настройка фабрики соединений MQ-очереди

  1. Откройте консоль администратора WebSphere Process Server.
  2. Перейдите в Resources => JMS Providers => WebSphere MQ => WebSphere MQ queue connection factories => MQHeaderTest_QCF => Custom properties.
  3. Выберите свойство SENDEXIT и замените его значение на com.ibm.test.mqexit.MQExitImp.
    Рисунок 9. Специализированные свойства фабрики соединений MQ-очереди
    Рисунок 9. Специализированные свойства фабрики соединений MQ-очереди

Тестирование модуля

Используйте универсальный тестовый клиент для тестирования модуля:

  1. Щелкните правой кнопкой мыши на компоненте-посреднике MQHeaderTest и выберите Test component.
  2. Тело сообщения нас не интересует, поэтому нажмите кнопку Continue.
  3. В окне Deployment Location выберите WebSphere ESB Server и нажмите кнопку Finish. Запустится сервер, и приложение будет развернуто и активизировано. После завершения теста компонента отобразится ответ.
    Рисунок 10. Тестовый компонент
    Рисунок 10. Тестовый компонент
  4. В WebSphere MQ Explorer щелкните правой кнопкой мыши на очереди Send и выберите Browse Messages.
  5. Дважды щелкните левой кнопкой мыши на сообщении, и вы увидите идентификационные данные приложения, установленные в компоненте-посреднике.
    Рисунок 11. MQMessage
    Рисунок 11. MQMessage

Ниже приведен журнал трассировки:

Листинг 3. Журнал трассировки
[15/07/10 15:47:05:110 EST] 00000047 SystemOut     O   > 
Queue connection input = 
54534820000000D80183300000000000000000000000011103520000000000D8000000000000000000000000
4F44202000000001000000052020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020414D512E2A20202020202020202020202020202020202020
20202020202020202020202020202020202020202020202020202020202020202020202000000020
[15/07/10 15:47:05:110 EST] 00000047 SystemOut     O   > 
remoteUserId = Administrator
[15/07/10 15:47:05:110 EST] 00000047 SystemOut     O   > 
Original Options = 32
[15/07/10 15:47:05:515 EST] 00000047 SystemOut     O   > 
Queue connection input = 
54534820000000D80183300000000000000000000000011103520000000000D8000000000000000000000000
4F44202000000001000000052020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020414D512E2A20202020202020202020202020202020202020
20202020202020202020202020202020202020202020202020202020202020202020202000000020
[15/07/10 15:47:05:515 EST] 00000047 SystemOut     O   > 
remoteUserId = Administrator
[15/07/10 15:47:05:515 EST] 00000047 SystemOut     O   > 
Original Options = 32
[15/07/10 15:47:05:671 EST] 00000047 SystemOut     O   > 
Queue connection input = 
54534820000000D80183300000000000000000000000011103520000000000D8000000000000000000000000
4F4420200000000100000001736D62496E202020202020202020202020202020202020202020202020202020
20202020202020202020202020202020514D5F69626D5F6C3374303534382020202020202020202020202020
2020202020202020202020202020202020202020414D512E2A20202020202020202020202020202020202020
20202020202020202020202020202020202020202020202020202020202020202020202000002010
[15/07/10 15:47:05:671 EST] 00000047 SystemOut     O   > 
remoteUserId = Administrator
[15/07/10 15:47:05:671 EST] 00000047 SystemOut     O   > 
Original Options = 8208
[15/07/10 15:47:05:718 EST] 00000047 SystemOut     O   > 
New Options = 00002410
[15/07/10 15:47:05:718 EST] 00000047 SystemOut     O   > 
Queue connection output = 
54534820000000D80183300000000000000000000000011103520000000000D8000000000000000000000000
4F4420200000000100000001736D62496E202020202020202020202020202020202020202020202020202020
20202020202020202020202020202020514D5F69626D5F6C3374303534382020202020202020202020202020
2020202020202020202020202020202020202020414D512E2A20202020202020202020202020202020202020
20202020202020202020202020202020202020202020202020202020202020202020202000002410
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   > 
Queue data input = 
545348200000023901863000000000000000000000000111035200000000023900000000000000000071FA40
4D442020000000020000000000000008FFFFFFFF0000000000000111000004B82020202020202020FFFFFFFF
0000000200000000000000000000000000000000000000000000000000000000000000000000000000000000
0000000000000000000000002020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020000000000000000000000000
0000000000000000000000000000000000000000202020202020202020202020202020202020202020202020
2020202020202020000000002020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202000000000000000000000000000000000000000000000000000000001
0000000000000000FFFFFFFF504D4F200000000100002002FFFFFFFF00000000000000000000000000000001
2020202020202020202020202020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020202020202020202020202020
20202020202020200000001D3036546573744944546869732069732061207465737420737472696E67
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   > 
remoteUserId = Administrator
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   > 
Original Options = 8194
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   > 
appIDLen = 6
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   > 
New Options = 00002402
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   > 
AppID = 546573744944
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   > 
Original dataLength = 29
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   > 
New dataLength = 00000015
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   > 
Queue data output = 54534820000002390186300000000000000000000000011103520000000002390000
0000000000000071FA404D442020000000020000000000000008FFFFFFFF0000000000000111000004B82020
202020202020FFFFFFFF00000002000000000000000000000000000000000000000000000000000000000000
0000000000000000000000000000000000000000000020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020202020202020202020200000
0000000000000000000000000000000000000000000000000000000000005465737449442020202020202020
2020202020202020202020202020202020200000000020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020000000000000000000000000000000000000
000000000000000000010000000000000000FFFFFFFF504D4F200000000100002402FFFFFFFF000000000000
0000000000000000000120202020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020202020202020202020202020
20202020202020202020202020202020202000000015546869732069732061207465737420737472696E67

Заключение

В данной статье рассматривалось использование специализированного механизма MQ SendExit для заполнения MQ-заголовков, необходимых встроенным MQ-приложениям.


Загрузка

ОписаниеИмяРазмер
Пример кодаMQHeaderTestPI.zip32 КБ

Ресурсы

Комментарии

developerWorks: Войти

Обязательные поля отмечены звездочкой (*).


Нужен IBM ID?
Забыли Ваш IBM ID?


Забыли Ваш пароль?
Изменить пароль

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Профиль создается, когда вы первый раз заходите в developerWorks. Информация в вашем профиле (имя, страна / регион, название компании) отображается для всех пользователей и будет сопровождать любой опубликованный вами контент пока вы специально не укажите скрыть название вашей компании. Вы можете обновить ваш IBM аккаунт в любое время.

Вся введенная информация защищена.

Выберите имя, которое будет отображаться на экране



При первом входе в developerWorks для Вас будет создан профиль и Вам нужно будет выбрать Отображаемое имя. Оно будет выводиться рядом с контентом, опубликованным Вами в developerWorks.

Отображаемое имя должно иметь длину от 3 символов до 31 символа. Ваше Имя в системе должно быть уникальным. В качестве имени по соображениям приватности нельзя использовать контактный e-mail.

Обязательные поля отмечены звездочкой (*).

(Отображаемое имя должно иметь длину от 3 символов до 31 символа.)

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Вся введенная информация защищена.


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=WebSphere
ArticleID=851936
ArticleTitle=Заполнение MQ-заголовков в WebSphere Enterprise Service Bus с использованием специализированного механизма MQ SendExit
publish-date=12142012