使用自定义 MQ SendExit 在 WebSphere Enterprise Service Bus 中填充 MQ 标头

与 WebSphere ESB 和 WebSphere Process Server 一起提供的 WebSphere MQ 绑定,实现了本地 MQ 应用程序和 Service Component Architecture (SCA) 环境之间的通信。WebSphere MQ 绑定简化了 MQ 消息的发送和接收,但在 WebSphere Integration Developer 中不能用 MQ 绑定填充 MQ 标头字段。本文将向您介绍如何使用自定义 MQ SendExit 来填充本地 MQ 应用程序所需的 MQ 标头,本文还包括在 MQ 标头设置 AppIdentityData 的样例代码。

Jun Shen, 高级顾问,WebSphere Lab Services 团队, IBM

Jun Shen 的照片Jun Shen 是 WebSphere Lab Services 团队的一名高级顾问,专门研究 WebSphere Process Server 和 SOA 实现,在中国和澳大利亚的 IT 行业已有 10 年的工作经验。他于 1999 年从中国的清华大学获得了计算机科学博士学位。您可以发送电子邮件至 junshen@au1.ibm.com 与他取得联系。



2012 年 8 月 17 日

简介

IBM WebSphere ESB 中的 IBM® WebSphere® MQ 绑定让您可以通过 WebSphere MQ 发送和接收消息。MQ 绑定自动将 MQ Message Descriptor (MQMD) 添加到任何没有 MQMD 的消息中,但不会修改 MQMD。MQHeaderSetter 基元可用于对消息创建和更改 MQ 标头,但在 WebSphere ESB V6.2 中有些字段不能被填充,包括:

  • UserIdentifier
  • AppIdentityData
  • PutApplType
  • PutApplName
  • ApplOriginData

虽然在 WebSphere ESB V7 或其更高版本中,可以通过在 JNDI 队列定义上设置自定义属性 MDCTX= SET_ALL_CONTEXT 传播这些 MQMD 字段,但本文介绍了通过自定义 MQ SendExit 来解决 WebSphere ESB V6.2 中的这个问题。本文使用了下列产品:

  • WebSphere Integration Developer V6.2.0.2
  • WebSphere ESB V6.2.0.2 Integrated Test Environment(包括在 WebSphere Integration Developer 中)
  • WebSphere MQ V6.0.2.9

使用自定义 MQ SendExit 填充 MQMD

样例项目将向您展示如何在 MQMD 中编写一个自定义 MQ SendExit 来填充 AppIdentityData:

构建模块

首先,创建一个 MQ 绑定测试模块:

  1. 创建一个名称为 MQHeaderTest 的新模块。确保选中了 Create mediation component
  2. 在库上打开 Dependencies,在 Predefined Resources 下面,选中 Schema for simple JMS Data Bindings,然后单击 Save。JMS 正文类型现在已可用。
  3. 在该模块中,创建一个名称为 MQHeaderTestService 的新接口。
  4. 创建一个名称为 sendMessage 的单向操作。
  5. 将 input1 的类型更改为 JMSTextBody 并单击 Save 保存接口:
    图 1. MQHeaderTestService 接口
    MQHeaderTestService 接口
  6. 创建一个名称为 MQTestMessage 的业务对象:
    图 2. MQTestMessage 业务对象
    MQTestMessage 业务对象
  7. 创建名称为 TestRequestService 的另一个接口:
    图 3. TestRequestService 接口
    TestRequestService 接口
  8. 右键单击中介组件,选择 Add: Interface,并选择 TestRequestService
  9. 将一个 Import 添加到装配图。
  10. 将中介组件连接到 Import,当被要求添加一个引用时,选择 MQHeaderTestService
  11. 右键单击 Import 并选择 Generate Binding => Messaging Binding => MQ Binding
  12. 为 MQ 连接工厂和队列指定 JNDI 名称,然后单击 OK
    图 4. MQ 绑定
    MQ 绑定

    装配图如图 5 所示:

    图 5. MQHeaderTest 装配图
    图 5. MQHeaderTest 装配图

构建中介流

下一步,创建中介流,将在 MQTestMessage 业务对象中的 appIdentityData 值和有效载荷转换成一个字符串,其结构如下所示。该操作的目的是通过 MQ 消息内容将 appIdentityData 值传递给自定义 MQ SendExit。

图 6. MQTestMessage 字符串结构
MQTestMessage 字符串结构
  1. 双击中介组件 MQHeaderTest,生成一个实现。
  2. 在 Mediation Flow Editor 中,将 request 连接到 sendMessage 以创建一个流。
  3. 在 Palette 中,选中 Transformation => Business Object Map
  4. 将自定义中介添加到画布。
  5. 将输入节点连接到自定义中介的输入终端。
  6. 将自定义中介的输出终端连接到调出节点。您的中介流如图 7 所示:
    图 7. MQHeaderTest 中介流
    MQHeaderTest 中介流
  7. 单击 BOMapper1 以实现映射:
    图 8. BOMapper1 实现
    BOMapper1 实现
  8. 利用清单 1 中的代码实现自定义映射:
    清单 1. BOMapper1 的自定义代码
    //get the length of appIdentityData
    String appIDLength = 
     StringUtils.getHexString(((String)requestRequestMsg_request_input_appIdentityData) 
    .length());
    
    sendMessageRequestMsg_sendMessage_input1_value = appIDLength + 
        (String)requestRequestMsg_request_input_appIdentityData + 
        (String)requestRequestMsg_request_input_payload;

构建自定义 MQ SendExit

在 com.ibm.test.mqexit 包中创建一个名称为 MQExitLib 的新库和一个名称为 MQExitImp 的新 Java 类,以实现 MQ SendExit:

清单 2. 自定义 MQ SendExit
package com.ibm.test.mqexit;

import java.io.ByteArrayOutputStream;

import com.ibm.mq.MQC;
import com.ibm.mq.MQChannelDefinition;
import com.ibm.mq.MQChannelExit;
import com.ibm.mq.data.MQDataOutputStream;
import com.ibm.ws.sca.internal.mq.exit.MQInternalSendExitImpl;

public class MQExitImp extends MQInternalSendExitImpl {
    
    public static final int mqQueueType = 131;
    public static final int queueControlOffset = 212;
    public static final int applIDControlOffset = 416;
    public static final int applIDOffset = 284;
    public static final int dataLengthOffset = 536;
    public static final int dataOffset = 540;

    public MQExitImp(String version) {
        super("Centrelink MQ SendExit implementation");
    }

    /**
     * 
     * Data format: AppIDLength(2 chars) + AppID + AppData
     */
    public byte[] sendExit(MQChannelExit exit, MQChannelDefinition chl,
            byte[] buf) {

        byte[] msg = super.sendExit(exit, chl, buf);

        if (exit.exitReason == MQChannelExit.MQXR_XMIT) {

            //send data
            if (byteToInt(buf[controlFlag2]) == mqputRequest) {
                int controlFlag = byteToInt(buf[controlFlag3]);
                if ((controlFlag == firstSegment)||(controlFlag == 
onlySegment)) {

                    System.out.println(">>>>> Queue data input = " + 
StringUtils.getHexString(msg));
                    System.out.println(">>>>> remoteUserId = " + 
chl.remoteUserId);

                    //process message data
                    boolean reversed = (byteToInt(msg[controlFlag1]) == 
mqFapReversed);
                    int ccsid = readShortInt(msg, mqFapCcsidOffset, 
    reversed);

                    int encoding = readInt(msg, mqFapMqEncOffset,
                            reversed);

                    //set IDENTITY_CONTEXT
                    int exitOptions = readInt(msg, applIDControlOffset, 
    reversed);
                    System.out.println(">>>>> Original Options = " + 
exitOptions);

                    if ((exitOptions & MQC.MQPMO_PASS_ALL_CONTEXT) == 
MQC.MQPMO_PASS_ALL_CONTEXT) return msg; 

                    ByteArrayOutputStream baos = null;
                    MQDataOutputStream mqdos = null;
                    try {
                        //set AppID
                        String appIdLenStr = new String(msg, 
dataOffset, 2);
                        int appIDLen = 0;
                        try {
                            appIDLen = 
Integer.parseInt(appIdLenStr);
                        } catch (Exception ex) {
                            System.out.println(">>>>> appIDLen 
exception =  " + ex.getMessage());
                            return msg;
                        }
                        System.out.println(">>>>> appIDLen = " + 
appIDLen);

                        //set options
                        baos = new ByteArrayOutputStream();
                        mqdos = new MQDataOutputStream(baos);
                        mqdos.setEncoding(encoding);
                        mqdos.setCCSID(ccsid);
                        mqdos.writeMQLONG(exitOptions | 
MQC.MQPMO_SET_IDENTITY_CONTEXT);

                        byte[] valBytes = baos.toByteArray();

                        System.out.println(">>>>> New Options = " + 
StringUtils.getHexString(valBytes));

                        System.arraycopy(valBytes, 0, msg,
                                applIDControlOffset,
                                valBytes.length);

                        //set AppID
                        String appID = new String(msg, dataOffset+2, 
appIDLen);

                        valBytes = appID.getBytes();

                        System.out.println(">>>>> AppID = "   + 
StringUtils.getHexString(valBytes));

                        System.arraycopy(valBytes, 0, msg,
                                applIDOffset,
                                valBytes.length);

                        //set AppData Length
                        int dataLength = readInt(msg, 
dataLengthOffset,   reversed);
                        System.out.println(">>>>> Original dataLength 
= " + dataLength);

                        baos = new ByteArrayOutputStream();
                        mqdos = new MQDataOutputStream(baos);
                        mqdos.setEncoding(encoding);
                        mqdos.setCCSID(ccsid);
                        mqdos.writeMQLONG(dataLength - appIDLen -2);

                        valBytes = baos.toByteArray();

                        System.out.println(">>>>> New dataLength = " 
+ StringUtils.getHexString(valBytes));

                        System.arraycopy(valBytes, 0, msg,
                                dataLengthOffset,
                                valBytes.length);

                        //set AppData
                        byte[] newMsg = new byte[msg.length - 
appIDLen -2];
                        //copy header
                        System.arraycopy(msg, 0, newMsg,
                                0,
                                dataOffset);

                        //copy data
                        System.arraycopy(msg, dataOffset + appIDLen + 
2, newMsg,
dataOffset,
                                dataLength - appIDLen -2);

                        System.out.println(">>>>> Queue data output = 
"   + StringUtils.getHexString(newMsg));

                        return newMsg;

                    } catch (Exception e) {
                        e.printStackTrace(System.out);
                    } finally {
                        if (mqdos != null) {
                            try {
                                mqdos.close();
                            } catch(Exception e1) {}
                        }
                        if (baos != null) {
                            try {
                                baos.close();
                            } catch(Exception e1) {}
                        }
                    }
                }
            } else    if (byteToInt(buf[controlFlag2]) == mqQueueType) {
                System.out.println(">>>>> Queue connection input = " +
StringUtils.getHexString(msg));
                System.out.println(">>>>> remoteUserId = " + 
chl.remoteUserId);

                //process queue control
                boolean reversed = (byteToInt(msg[controlFlag1]) == 
mqFapReversed);
                int ccsid = readShortInt(msg, mqFapCcsidOffset,
                        reversed);

                int encoding = readInt(msg, mqFapMqEncOffset,
                        reversed);

                int exitOptions = readInt(msg, queueControlOffset,
                        reversed);

                System.out.println(">>>>> Original Options = " + 
exitOptions);
                if ((exitOptions & MQC.MQOO_PASS_ALL_CONTEXT) == 
MQC.MQOO_PASS_ALL_CONTEXT) return msg; 

                if ((exitOptions & MQC.MQOO_OUTPUT) == MQC.MQOO_OUTPUT) {
                    ByteArrayOutputStream baos = null;
                    MQDataOutputStream mqdos = null;
                    try {
                        baos = new ByteArrayOutputStream();
                        mqdos = new MQDataOutputStream(baos);
                        mqdos.setEncoding(encoding);
                        mqdos.setCCSID(ccsid);
                        mqdos.writeMQLONG(exitOptions | 
MQC.MQOO_SET_IDENTITY_CONTEXT);

                        byte[] valBytes = baos.toByteArray();

                        System.out.println(">>>>> New Options = " + 
StringUtils.getHexString(valBytes));

                        System.arraycopy(valBytes, 0, msg,
                                queueControlOffset,
                                valBytes.length);

                        System.out.println(">>>>> Queue connection 
output = " + StringUtils.getHexString(msg));

                    } catch (Exception e) {
                        e.printStackTrace(System.out);
                    } finally {
                        if (mqdos != null) {
                            try {
                                mqdos.close();
                            } catch(Exception e1) {}
                        }
                        if (baos != null) {
                            try {
                                baos.close();
                            } catch(Exception e1) {}
                        }
                    }
                }
            }
        }
        return msg;
    }
}

自定义 SendExit 类扩展了 com.ibm.ws.sca.internal.mq.exit.MQInternalSendExitImpl。可通过改变两种情况来改变 MQ 消息缓冲区:

  1. 如果消息类型是 mqQueue,将队列选项设置为 MQOO_SET_IDENTITY_CONTEXT。
  2. 如果消息类型是 mqputRequest,将消息选项设置为 MQPMO_SET_IDENTITY_CONTEXT 。将从消息内容检索 appIdentityData 值,并将它填充到消息缓冲区。然后,将从消息内容数据中删除 appIdentityData 值。

部署自定义 MQ SendExit

安装 SendExit JAR 文件

  1. 将 Java 项目导出为一个 JAR 文件。
  2. 将该 JAR 文件复制到目录 <WESB_HOME>/lib/ext,其中 WESB_HOME 是 WebSphere ESB 的安装目录。

配置 MQ Queue Connection Factory

  1. 打开 WebSphere Process Server 管理控制台
  2. 导航到 Resources => JMS Providers => WebSphere MQ => WebSphere MQ queue connection factories => MQHeaderTest_QCF => Custom properties
  3. 选中现有属性 SENDEXIT,用 com.ibm.test.mqexit.MQExitImp 替换当前值:
    图 9. MQ Queue Connection Factory 自定义属性
    MQ Queue Connection Factory 自定义属性

测试模块

使用通用的测试客户端来测试模块:

  1. 右键单击 MQHeaderTest 中介组件,并选择 Test component
  2. 消息的正文并不重要,所以单击 Continue
  3. 在 Deployment Location 窗口,选择 WebSphere ESB Server 并单击 Finish。 服务器将启动,并且应用程序将被部署和调用。完成组件测试后,您将看到响应:
    图 10. 测试组件
    测试组件
  4. 使用 WebSphere MQ Explorer,右键单击 Send 队列并选择 Browse Messages
  5. 双击消息,您将会看到在中介组件中设置的应用程序标识数据:
    图 11. MQMessage
    MQMessage

以下是跟踪日志:

清单 3. 跟踪日志
[15/07/10 15:47:05:110 EST] 00000047 SystemOut     O   >>>>> 
Queue connection input = 
54534820000000D80183300000000000000000000000011103520000000000D8000000000000000000000000
4F44202000000001000000052020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020414D512E2A20202020202020202020202020202020202020
20202020202020202020202020202020202020202020202020202020202020202020202000000020
[15/07/10 15:47:05:110 EST] 00000047 SystemOut     O   >>>>> 
remoteUserId = Administrator
[15/07/10 15:47:05:110 EST] 00000047 SystemOut     O   >>>>> 
Original Options = 32
[15/07/10 15:47:05:515 EST] 00000047 SystemOut     O   >>>>> 
Queue connection input = 
54534820000000D80183300000000000000000000000011103520000000000D8000000000000000000000000
4F44202000000001000000052020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020414D512E2A20202020202020202020202020202020202020
20202020202020202020202020202020202020202020202020202020202020202020202000000020
[15/07/10 15:47:05:515 EST] 00000047 SystemOut     O   >>>>> 
remoteUserId = Administrator
[15/07/10 15:47:05:515 EST] 00000047 SystemOut     O   >>>>> 
Original Options = 32
[15/07/10 15:47:05:671 EST] 00000047 SystemOut     O   >>>>> 
Queue connection input = 
54534820000000D80183300000000000000000000000011103520000000000D8000000000000000000000000
4F4420200000000100000001736D62496E202020202020202020202020202020202020202020202020202020
20202020202020202020202020202020514D5F69626D5F6C3374303534382020202020202020202020202020
2020202020202020202020202020202020202020414D512E2A20202020202020202020202020202020202020
20202020202020202020202020202020202020202020202020202020202020202020202000002010
[15/07/10 15:47:05:671 EST] 00000047 SystemOut     O   >>>>> 
remoteUserId = Administrator
[15/07/10 15:47:05:671 EST] 00000047 SystemOut     O   >>>>> 
Original Options = 8208
[15/07/10 15:47:05:718 EST] 00000047 SystemOut     O   >>>>> 
New Options = 00002410
[15/07/10 15:47:05:718 EST] 00000047 SystemOut     O   >>>>> 
Queue connection output = 
54534820000000D80183300000000000000000000000011103520000000000D8000000000000000000000000
4F4420200000000100000001736D62496E202020202020202020202020202020202020202020202020202020
20202020202020202020202020202020514D5F69626D5F6C3374303534382020202020202020202020202020
2020202020202020202020202020202020202020414D512E2A20202020202020202020202020202020202020
20202020202020202020202020202020202020202020202020202020202020202020202000002410
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   >>>>> 
Queue data input = 
545348200000023901863000000000000000000000000111035200000000023900000000000000000071FA40
4D442020000000020000000000000008FFFFFFFF0000000000000111000004B82020202020202020FFFFFFFF
0000000200000000000000000000000000000000000000000000000000000000000000000000000000000000
0000000000000000000000002020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020000000000000000000000000
0000000000000000000000000000000000000000202020202020202020202020202020202020202020202020
2020202020202020000000002020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202000000000000000000000000000000000000000000000000000000001
0000000000000000FFFFFFFF504D4F200000000100002002FFFFFFFF00000000000000000000000000000001
2020202020202020202020202020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020202020202020202020202020
20202020202020200000001D3036546573744944546869732069732061207465737420737472696E67
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   >>>>> 
remoteUserId = Administrator
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   >>>>> 
Original Options = 8194
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   >>>>> 
appIDLen = 6
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   >>>>> 
New Options = 00002402
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   >>>>> 
AppID = 546573744944
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   >>>>> 
Original dataLength = 29
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   >>>>> 
New dataLength = 00000015
[15/07/10 15:47:05:873 EST] 00000047 SystemOut     O   >>>>> 
Queue data output = 54534820000002390186300000000000000000000000011103520000000002390000
0000000000000071FA404D442020000000020000000000000008FFFFFFFF0000000000000111000004B82020
202020202020FFFFFFFF00000002000000000000000000000000000000000000000000000000000000000000
0000000000000000000000000000000000000000000020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020202020202020202020200000
0000000000000000000000000000000000000000000000000000000000005465737449442020202020202020
2020202020202020202020202020202020200000000020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020000000000000000000000000000000000000
000000000000000000010000000000000000FFFFFFFF504D4F200000000100002402FFFFFFFF000000000000
0000000000000000000120202020202020202020202020202020202020202020202020202020202020202020
2020202020202020202020202020202020202020202020202020202020202020202020202020202020202020
20202020202020202020202020202020202000000015546869732069732061207465737420737472696E67

结束语

本文介绍了如何使用自定义 MQ SendExit 来填充本地 MQ 应用程序所需的 MQ 标头。


下载

描述名字大小
代码样例MQHeaderTestPI.zip32 KB

参考资料

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=WebSphere
ArticleID=830868
ArticleTitle=使用自定义 MQ SendExit 在 WebSphere Enterprise Service Bus 中填充 MQ 标头
publish-date=08172012