级别: 中级 许礼兵 (xulibing@cn.ibm.com), 软件工程师, IBM中国软件开发中心
2008 年 1 月 04 日 目前针对MQ的消息处理程序在很多企业应用很广泛,在作者最近给予的一些客户的现场技术支持中碰到了一些关于MQ编程使用的常见问题。做出一些总结供大家参考,以避免以后在开发中遇到类似的问题。
1 概述
WebSphere MQ是基于消息队列(Message Queuing)或消息传送(Message passing)的中间件,主要功能是在应用程序之间传送消息,这些消息可以在不同的网络协议、不同的计算机系统和不同的应用软件之间传递。通过使用WebSphere MQ用户可以简单方便的开发出可靠、高效的分布式应用系统,正符合目前火热的SOA技术。
针对MQ消息的编程时下应用最广泛的就是JMS编程了,那么怎么完善您的消息处理机制,使得整个应用系统成为一个健壮的系统是非常重要的,目前在很多企业应用中并没有完全考虑到这一点。当然,我们这篇文章主要讨论的是针对WebSphere Application Server上使用JMS处理WebSphere MQ消息的机制,对于整个企业应用的高可用性和容错性不在这里讨论。让我们先看一下目前客户应用中的消息处理程序经常碰到的问题
2 消息侦听处理的常见问题
图1:常见的消息侦听处理机制
上面是目前常见的消息侦听处理结构,在这种情况下编程处理简单,updateData()是假定的消息驱动Bean调用的一个方法。和消息驱动Bean的onMessage(javax.jms.Message msg)方法一起构成了消息侦听处理的业务逻辑。这种结构默认所有情况都比较理想,这样在理想情况下,系统运行正常。如果发生消息处理异常(包括业务处理方法时异常和消息驱动Bean的 onMessage(javax.jms.Message msg)方法里其他代码的异常)超过侦听端口的最大重试次数(系统默认是5次),侦听端口就会挂起,在没有人工干预的情况下那么消息不再被处理。消息侦听停止,消息处理模块停止工作,那么您的消息侦听处理程序就崩溃了,这样严重影响业务的处理开展,甚至造成定单丢失等。
下面我们先解释一下消息侦听的机制,消息侦听处理的最重要的组件就是消息、侦听端口和消息侦听程序(大部分是消息驱动Bean),侦听端口是用来侦听放在WebSphere MQ消息队列的一个WebSphere Application Server 的一个组件。当消息队列里有消息时候,消息侦听端口侦听到消息队列的深度变化,绑定到该消息侦听端口的消息驱动Bean即调用它的onMessage(javax.jms.Message msg)方法,读取消息队列里的消息,在onMessage()方法运行完成后,EJB事务提交,消息队列里的消息即被消费掉。onMessage()方法读取下一条消息。
消息侦听端口有3个重要的参数最大重试次数,最大会话数和最大消息数。最大会话数是WebSphere Application Server 能够实例化该消息侦听端口的最大的个数,可以简单的理解为最大的消息驱动Bean的实例个数,最大重试次数是该消息侦听端口在消息驱动Bean消费这条消息时出现异常(即消息驱动Bean的onMessage()方法出现异常)时重试消费这条消息的次数。当超过重试次数时,消息侦听端口会被停止,消息侦听程序停止处理消息。消息在没有人工干预的情况下不再能够被读取到。
2.1 常见的编程
我们不讲述前端存放消息的编程处理,主要看在消息驱动Bean这块代码的编写。很多开发者在开发消息侦听处理的业务模块时,不考虑到消息侦听端口和错误消息以及业务逻辑异常的问题。如下面的代码:
常见JMS编程
/****************消息处理业务逻辑代码************************************/
public void onMessage(javax.jms.Message msg) {
try{
updateData();
/******************/
Other process code
}catch(Exception e){
System.out.println("----------------------mdb process error------------------");
e.printStackTrace();
}
}
/************************** 数据库处理 **********************/
private void updateData() throw Exception {
try {
Connection con=getConnection();
String updateStatement ="update pub_msg_his set DEAL_FLAG='1' where MSG_ID=?";
PreparedStatement prepStmt = con.prepareStatement(updateStatement);
prepStmt.setString(1,corid);
prepStmt.execute();
prepStmt.close();
releaseConnection();
} catch (Exception ex) {
ex.getMessage());
}
}
private void getConnection() {
try {
InitialContext ic = new InitialContext();
DataSource ds = (DataSource) ic.lookup("java:comp/env/jdbc/mdb");
con = ds.getConnection();
} catch (Exception ex) {
throw new EJBException("getConnection: " + ex.getMessage());
}
}
private void releaseConnection() {
try {
con.close();
} catch (SQLException ex) {
throw new EJBException("releaseConnection: " + ex.getMessage());
}
}
|
2.2 常见编程存在的问题
上面的代码在理想情况下会运行得很好,updateData()是消息驱动Bean调用的一个方法,涉及到客户应用的业务逻辑。默认所有情况都比较理想,这样在理想情况下,系统运行正常。如果发生消息处理异常(包括updateData()异常和消息驱动Bean onMessage()方法里其他代码的异常)超过侦听端口的最大重试次数(系统默认是5次),侦听端口就会挂起,在没有人工干预的情况下那么消息不再被处理。造成这些异常的情况有多种,比如数据库宕机,消息是有害消息等等,一些客观存在的情况,所以我们就要考虑怎么去避免这种问题了。
而且在上面的代码中,updateData()没有将他捕获的异常扔出来,只是简单的将异常打印出来。在onMessage()方法里也没有将事务回滚。或者有的编程人员干脆就是简单的捕获异常不做任何处理。那么这样的消息处理机制就不是一个健壮的消息处理机制。为了解决这个问题,结合在项目中的一些经验,对以上的编程和配置做以下的改进。
小提示:有害消息就是接收该消息的 MDB 应用程序无法处理的消息。该消息可能是已损坏的或是以非预期格式表示的消息。例如,假设您有一个处理 TextMessage 类型 JMS 消息的 MDB。如果消息侦听器服务传递了一个具有不同消息类型的消息,则该消息被认为是有害消息。
遇到有害消息可以做以下三件事之一:
1.将消息回滚到它来自的队列。如果该 MDB 在一个事务中运行,并且确保该消息未丢失,则可以完成此操作。为此,MDB 必须在与它相关的消息驱动上下文中调用 setRollbackOnly() 方法。
2.将消息移动到不同的队列。该方法对于 MDB 没有在一个事务中运行时特别有用,因为该方法能防止有害消息丢失。
3.无需做任何事,丢弃该消息。这表示该消息永远消失。
3 改进的消息处理机制
图2:健壮的消息侦听处理机制
上面是我们结合WebSphere MQ消息队列的死信处理机制和WebSphere Application Server的消息侦听端口重试机制改进的消息侦听处理结构。在这种结构下,无论是消息格式错误或者其他资源错误,消息处理程序都能将消息回滚到原队列,在消息处理程序重试一定次数后将消息放入指定的死信队列,这样就能保证消息不被丢失,消息侦听程序也会正常的工作,侦听端口不会被挂起。保证了消息侦听处理程序的健壮可用。为了实现上述的处理机制,先要准备我们的MQ环境和WebSphere Application Server环境。
3.1 环境准备
1、在WAS管理控制台,指定消息侦听端口的重试次数。
2、修改队列的回退阀值和回退队列,使消息侦听端口的重试次数大于队列的回退阀值。
指定队列的死信队列:
define qlocal('processDeadQ') MAXDEPTH(100000)
alter qlocal('processQ') BOQNAME('processQ') BOTHRESH(4)
也可通过WebSphere MQ资源管理器来指定死信队列以及队列的回退阀值。
关于怎样消除有害消息,保障消息侦听端口的可用性可以参考以下文档:
http://www.ibm.com/developerworks/cn/websphere/library/techarticles/0405_titheridge/0405_titheridge.html
在准备好环境之后,再修改我们的消息处理程序代码,使其达到我们预期的目的,保证程序的可持续运行。
3.2 代码改进
1、为了避免出现消息丢失和侦听停止的问题,对MDB代码做以下修改:MDB 的ONMESSAGE()方法里代码添加事务回滚(如下),以保证消息在业务处理过程中出现异常能够将事务回退,实现对事务的两阶段提交。
改进的MDB的onMessage()方法
try {
if (msg instanceof ObjectMessage) {
ObjectMessage message = (ObjectMessage) msg;
Object obj = message.getObject();
StaffActivity activity = (StaffActivity) obj;
OutAction action = new OutAction();
action.sendMessage(activity);
}
}catch(TemplateException te){
System.out.println("消息格式不对,非系统异常,丢弃该消息");
} catch (Exception e) {
e.printStackTrace();
getMessageDrivenContext().setRollbackOnly();
}
|
在ONMESSAGE()里调用的外部方法一定要抛出异常被消息处理程序捕获到,这样才能在出现异常时对资源进行回滚,保证事务的完整性。
2、注意对onMessage方法调用的外部业务逻辑代码updateData()的修改
对于onMessage方法外部调用的代码,我们一定要保证onMessage方法里能够捕获到在外部方法里的异常,所以我们一定要在外部方法里出现异常时抛出异常以便被onMessage()方法捕获。如下:
改进的外部方法
public void updateData(StaffActivity sa) throws Exception {
// coming here
logger.entering("OutAction ","sendMessage(StaffActivity sa)");
logger.info("The input Message:"+sa.toString());
OutMessage outMessage = new OutMessage(sa);
try {
MmsMessage msg = outMessage.buildMessge();
} catch (IOException e) {
logger.throwing("OutAction", "sendMessage()", e);
throw e;
} catch (TemplateException e) {
logger.throwing("OutAction", "sendMessage()", e);
throw e;
}
logger.exiting("OutAction ","sendMessage(StaffActivity sa)");
}
|
这样我们就完成了一个高可靠的消息侦听程序,在SOA日益被广泛使用的今天,用好消息处理作为SOA应用的重要实现方法会起到事半功倍的效果。
参考资料
关于作者  | |  | 许礼兵,软件工程师,服务于IBM中国软件开发中心,是 WebSphere Application Server、WebSphere Portal Server、WebSphere MQ、WBI Server Foundation 等企业整合软件方面的专家,具有丰富的 J2EE 应用设计、开发和移植经验。 |
对本文的评价
|