IBM®
跳转到主要内容
    中国 [选择]    使用条款
 
 
Select a scope: Search for:    
    首页    产品    服务与解决方案     支持与下载    个性化服务    
跳转到主要内容

developerWorks 中国  >  WebSphere  >

在WAS上开发基于WebSphere MQ的JMS消息侦听处理程序

developerWorks
文档选项

未显示需要 JavaScript 的文档选项

讨论


级别: 中级

许礼兵 (xulibing@cn.ibm.com), 软件工程师, IBM中国软件开发中心

2008 年 1 月 04 日

目前针对MQ的消息处理程序在很多企业应用很广泛,在作者最近给予的一些客户的现场技术支持中碰到了一些关于MQ编程使用的常见问题。做出一些总结供大家参考,以避免以后在开发中遇到类似的问题。

1 概述

WebSphere MQ是基于消息队列(Message Queuing)或消息传送(Message passing)的中间件,主要功能是在应用程序之间传送消息,这些消息可以在不同的网络协议、不同的计算机系统和不同的应用软件之间传递。通过使用WebSphere MQ用户可以简单方便的开发出可靠、高效的分布式应用系统,正符合目前火热的SOA技术。

针对MQ消息的编程时下应用最广泛的就是JMS编程了,那么怎么完善您的消息处理机制,使得整个应用系统成为一个健壮的系统是非常重要的,目前在很多企业应用中并没有完全考虑到这一点。当然,我们这篇文章主要讨论的是针对WebSphere Application Server上使用JMS处理WebSphere MQ消息的机制,对于整个企业应用的高可用性和容错性不在这里讨论。让我们先看一下目前客户应用中的消息处理程序经常碰到的问题

2 消息侦听处理的常见问题


图1:常见的消息侦听处理机制
图1:常见的消息侦听处理机制

上面是目前常见的消息侦听处理结构,在这种情况下编程处理简单,updateData()是假定的消息驱动Bean调用的一个方法。和消息驱动Bean的onMessage(javax.jms.Message msg)方法一起构成了消息侦听处理的业务逻辑。这种结构默认所有情况都比较理想,这样在理想情况下,系统运行正常。如果发生消息处理异常(包括业务处理方法时异常和消息驱动Bean的 onMessage(javax.jms.Message msg)方法里其他代码的异常)超过侦听端口的最大重试次数(系统默认是5次),侦听端口就会挂起,在没有人工干预的情况下那么消息不再被处理。消息侦听停止,消息处理模块停止工作,那么您的消息侦听处理程序就崩溃了,这样严重影响业务的处理开展,甚至造成定单丢失等。

下面我们先解释一下消息侦听的机制,消息侦听处理的最重要的组件就是消息、侦听端口和消息侦听程序(大部分是消息驱动Bean),侦听端口是用来侦听放在WebSphere MQ消息队列的一个WebSphere Application Server 的一个组件。当消息队列里有消息时候,消息侦听端口侦听到消息队列的深度变化,绑定到该消息侦听端口的消息驱动Bean即调用它的onMessage(javax.jms.Message msg)方法,读取消息队列里的消息,在onMessage()方法运行完成后,EJB事务提交,消息队列里的消息即被消费掉。onMessage()方法读取下一条消息。

消息侦听端口有3个重要的参数最大重试次数,最大会话数和最大消息数。最大会话数是WebSphere Application Server 能够实例化该消息侦听端口的最大的个数,可以简单的理解为最大的消息驱动Bean的实例个数,最大重试次数是该消息侦听端口在消息驱动Bean消费这条消息时出现异常(即消息驱动Bean的onMessage()方法出现异常)时重试消费这条消息的次数。当超过重试次数时,消息侦听端口会被停止,消息侦听程序停止处理消息。消息在没有人工干预的情况下不再能够被读取到。

2.1 常见的编程

我们不讲述前端存放消息的编程处理,主要看在消息驱动Bean这块代码的编写。很多开发者在开发消息侦听处理的业务模块时,不考虑到消息侦听端口和错误消息以及业务逻辑异常的问题。如下面的代码:


常见JMS编程
                
/****************消息处理业务逻辑代码************************************/
public void onMessage(javax.jms.Message msg) {
  try{
	updateData();
	/******************/
	Other process code
  }catch(Exception e){
	System.out.println("----------------------mdb process error------------------");
	e.printStackTrace();
  }       
}
/************************** 数据库处理 **********************/
private void updateData() throw Exception {
  try {
	Connection con=getConnection();
	String updateStatement ="update pub_msg_his set DEAL_FLAG='1' where MSG_ID=?";
	PreparedStatement prepStmt = con.prepareStatement(updateStatement);
	prepStmt.setString(1,corid);
	prepStmt.execute();
	prepStmt.close();
	releaseConnection();
  } catch (Exception ex) {
	ex.getMessage());
  }
}

private void getConnection() {
  try {
	InitialContext ic = new InitialContext();
	DataSource ds = (DataSource) ic.lookup("java:comp/env/jdbc/mdb");
	con = ds.getConnection();
  } catch (Exception ex) {
	throw new EJBException("getConnection: " + ex.getMessage());
  }
}
private void releaseConnection() {
  try {
	con.close();
  } catch (SQLException ex) {
	throw new EJBException("releaseConnection: " + ex.getMessage());
  }
}

2.2 常见编程存在的问题

上面的代码在理想情况下会运行得很好,updateData()是消息驱动Bean调用的一个方法,涉及到客户应用的业务逻辑。默认所有情况都比较理想,这样在理想情况下,系统运行正常。如果发生消息处理异常(包括updateData()异常和消息驱动Bean onMessage()方法里其他代码的异常)超过侦听端口的最大重试次数(系统默认是5次),侦听端口就会挂起,在没有人工干预的情况下那么消息不再被处理。造成这些异常的情况有多种,比如数据库宕机,消息是有害消息等等,一些客观存在的情况,所以我们就要考虑怎么去避免这种问题了。

而且在上面的代码中,updateData()没有将他捕获的异常扔出来,只是简单的将异常打印出来。在onMessage()方法里也没有将事务回滚。或者有的编程人员干脆就是简单的捕获异常不做任何处理。那么这样的消息处理机制就不是一个健壮的消息处理机制。为了解决这个问题,结合在项目中的一些经验,对以上的编程和配置做以下的改进。

小提示:有害消息就是接收该消息的 MDB 应用程序无法处理的消息。该消息可能是已损坏的或是以非预期格式表示的消息。例如,假设您有一个处理 TextMessage 类型 JMS 消息的 MDB。如果消息侦听器服务传递了一个具有不同消息类型的消息,则该消息被认为是有害消息。

遇到有害消息可以做以下三件事之一:

1.将消息回滚到它来自的队列。如果该 MDB 在一个事务中运行,并且确保该消息未丢失,则可以完成此操作。为此,MDB 必须在与它相关的消息驱动上下文中调用 setRollbackOnly() 方法。

2.将消息移动到不同的队列。该方法对于 MDB 没有在一个事务中运行时特别有用,因为该方法能防止有害消息丢失。

3.无需做任何事,丢弃该消息。这表示该消息永远消失。

3 改进的消息处理机制


图2:健壮的消息侦听处理机制
图2:健壮的消息侦听处理机制

上面是我们结合WebSphere MQ消息队列的死信处理机制和WebSphere Application Server的消息侦听端口重试机制改进的消息侦听处理结构。在这种结构下,无论是消息格式错误或者其他资源错误,消息处理程序都能将消息回滚到原队列,在消息处理程序重试一定次数后将消息放入指定的死信队列,这样就能保证消息不被丢失,消息侦听程序也会正常的工作,侦听端口不会被挂起。保证了消息侦听处理程序的健壮可用。为了实现上述的处理机制,先要准备我们的MQ环境和WebSphere Application Server环境。

3.1 环境准备

1、在WAS管理控制台,指定消息侦听端口的重试次数。

2、修改队列的回退阀值和回退队列,使消息侦听端口的重试次数大于队列的回退阀值。

指定队列的死信队列:

define qlocal('processDeadQ') MAXDEPTH(100000)

alter qlocal('processQ') BOQNAME('processQ') BOTHRESH(4)

也可通过WebSphere MQ资源管理器来指定死信队列以及队列的回退阀值。

关于怎样消除有害消息,保障消息侦听端口的可用性可以参考以下文档:

http://www.ibm.com/developerworks/cn/websphere/library/techarticles/0405_titheridge/0405_titheridge.html

在准备好环境之后,再修改我们的消息处理程序代码,使其达到我们预期的目的,保证程序的可持续运行。

3.2 代码改进

1、为了避免出现消息丢失和侦听停止的问题,对MDB代码做以下修改:MDB 的ONMESSAGE()方法里代码添加事务回滚(如下),以保证消息在业务处理过程中出现异常能够将事务回退,实现对事务的两阶段提交。


改进的MDB的onMessage()方法
                
try {
	if (msg instanceof ObjectMessage) {
		ObjectMessage message = (ObjectMessage) msg;
		Object obj = message.getObject();
		StaffActivity activity = (StaffActivity) obj;
		OutAction action = new OutAction();
		action.sendMessage(activity);
	}
  }catch(TemplateException te){
	System.out.println("消息格式不对,非系统异常,丢弃该消息");	
  } catch (Exception e) {
	e.printStackTrace();
	getMessageDrivenContext().setRollbackOnly();
}

在ONMESSAGE()里调用的外部方法一定要抛出异常被消息处理程序捕获到,这样才能在出现异常时对资源进行回滚,保证事务的完整性。

2、注意对onMessage方法调用的外部业务逻辑代码updateData()的修改

对于onMessage方法外部调用的代码,我们一定要保证onMessage方法里能够捕获到在外部方法里的异常,所以我们一定要在外部方法里出现异常时抛出异常以便被onMessage()方法捕获。如下:


改进的外部方法
                
public void updateData(StaffActivity sa) throws Exception {
// coming here
	logger.entering("OutAction ","sendMessage(StaffActivity sa)");
	logger.info("The input Message:"+sa.toString());
	OutMessage outMessage = new OutMessage(sa);
	try {
		MmsMessage msg = outMessage.buildMessge();			
	} catch (IOException e) {			
		logger.throwing("OutAction", "sendMessage()", e);
		throw e;
	} catch (TemplateException e) {			
		logger.throwing("OutAction", "sendMessage()", e);
		throw e;
	}
		logger.exiting("OutAction ","sendMessage(StaffActivity sa)");
}

这样我们就完成了一个高可靠的消息侦听程序,在SOA日益被广泛使用的今天,用好消息处理作为SOA应用的重要实现方法会起到事半功倍的效果。



参考资料



关于作者

许礼兵,软件工程师,服务于IBM中国软件开发中心,是 WebSphere Application Server、WebSphere Portal Server、WebSphere MQ、WBI Server Foundation 等企业整合软件方面的专家,具有丰富的 J2EE 应用设计、开发和移植经验。




对本文的评价










回页首


IBM 公司保留在 developerWorks 网站上发表的内容的著作权。未经IBM公司或原始作者的书面明确许可,请勿转载。如果您希望转载,请通过 提交转载请求表单 联系我们的编辑团队。
    关于 IBM 隐私条约 联系 IBM 使用条款