内容


使用 Apache Geronimo 和 JMS 构建事件驱动的框架

使用 Java 反射定义事件类型

当实时更改和事件发生时,对其作出响应是企业框架的重要需求。本文介绍了 Apache Geronimo 框架中采用的技术和机制,它们使应用程序和服务能够有效地响应实时刺激,然后跨架构虚拟层发送和接收事件。

开发人员试图使用传统顺序处理方法设计和构建动态工作流和集成系统时会遇到麻烦,因此,急需更适合的、事件感知的技术和工具。SOA 和事件驱动的编程可以解决这一复杂的难题。

SOA 给出一个松散耦合的开发模型和运行时环境。它使服务提供者和服务消费者能够使用动态组件交互来构建交互模型,这些交互模型能够利用该开发模型灵活性和强大功能。事件驱动的交互模型比传统同步机制能更及时地对动态事件作出响应,部分原因是 SOA 中事件驱动的编程利用分布式系统本身需要的许多相同特性,包括专门化、模块化和适应性。

事件驱动的架构

2003 年,Gartner Group 引入事件驱动的架构 (EDA) 作为一种构建系统、服务和应用程序的方法,在这些所构建的东西中,事件在松散耦合的事件接收者之间路由。事件驱动的系统由事件生产者事件接收者 组成。事件生产者可以将事件发布到事件通道,后者可以将事件分发到订阅事件的接收者。 与生产者发布事件一样,事件通道将事件转发给接收者。如果没有可用的接收者,事件通道会将事件存储起来,然后将其转发到稍后可用的接收者。此过程称为存储和转发

EDA 使用传递消息概念作为两个或多个实体之间交互的方法。通过触发对应于某些业务领域事件的信号和消息,来启动交互。当每个给定事件发生时,会通知该事件的所有订阅者。然后订阅者可以对事件采取行动。

EDA 受益于以下属性:

  • 无耦合的关联:事件发布者和事件订阅者预先无需知道彼此的存在。
  • 多对多的交互:一个或多个事件会影响一个或多个订阅者。
  • 基于事件的控制流程:当应用程序响应发生的事件时,应用程序流程非常自然。
  • 异步消息传递:业务逻辑可以随事件同时发生。

通过围绕 EDA 构建应用程序和系统,您可以用使其更具响应性的方式构建它们,因为通过设计,事件驱动系统更适用于不可预知且不断更改的环境。

事件驱动设计和开发的优点

事件驱动的编程有许多优点。例如,此类编程可以:

  • 减少开发和维护分布式系统的复杂性。
  • 使得应用程序和服务的装配和配置更加容易且成本更低。
  • 促进源代码和组件重用,从而减少 bug 并促进敏捷的开发和部署。

短期内,事件驱动的设计和开发允许更加快速、容易的定制。长期内,系统状态更加精确。

EDA 和 SOA 的结合

与顺序式或过程式系统中客户机必须轮询更改请求不同,EDA 允许系统和组件在事件发生时实时动态地作出响应。EDA 通过引入长时间运行的处理功能来弥补 SOA 的不足。因为事件消费者在事件发生时接收事件,并且会调用松散耦合的服务来向客户提供更及时更精确的数据,所以对业务有益。

在 EDA 内部,您可以跨 SOA 的各个分段(包括物理层和架构的虚拟层)传输事件,这样系统可以有效地作出响应。图 1 说明了跨架构堆栈各层传播的事件。

图 1. 跨虚拟层的传播
跨虚拟层的传播
跨虚拟层的传播

正如您看到的,事件可以因应用程序、业务、组件、平台或系统层的任何更改而发生,从技术观点上讲,业务事件的级别自然要比系统事件或组件事件高。

事件的原因(事件因果关系)是理解该事件的重要因素。事件因果关系可分为水平因果关系垂直因果关系。当事件发布者与事件接收者位于架构虚拟层中的同一层时,发生水平因果关系。当事件发布者与事件接收者位于不同层时,发生垂直因果关系。

EDA 和事件队列

事件驱动的编程是围绕事件生产者和事件消费者之间的无耦合关系的概念构造的。也就是说,事件消费者不关心事件发生的地点或原因;而是关注事件发生后它(消费者)将被调用。将事件生产者与事件消费者隔离开来的系统和应用程序通常依赖于事件分配器或通道。此通道包含事件队列,用作事件生产者和事件处理程序之间的中间层。

图 2 说明了生产者、消费者、事件通道和主题(或队列)之间的关系。

图 2. 事件队列
事件队列
事件队列

事件队列的角色是存储从生产者接收的事件,并在每个消费者可用时将这些事件传输给消费者 —— 通常是为了事件被接收。

事件队列和主题

多数事件驱动的系统依赖于预先构建的事件队列技术,如面向消息的中间件(Message-Oriented Middleware,MOM)框架。MOM 是一种基于消息队列的异步消息模型框架。

MOM 框架的主要优点是它能够无限期地存储消息,并在消费者准备接收消息时,将其路由到消费者。MOM 按照以下消息模型工作:

  • 点对点:此模型基于称为队列 的消息库,在该模型中,可以将消息从一个或多个生产者发送给单个消费者。
  • 发布/订阅:此模型基于称为主题 的消息库,在该模型中,可以将消息从一个或多个生产者发布给一个或多个已订阅的消费者。

图 3 说明了一个发布者、事件通道、主题和多个订阅了给定消息类型的消费者之间的交互。

图 3. 一个发布者、多个订阅者、事件通道和主题之间的交互
一个发布者、多个订阅者、事件通道和主题之间的交互
一个发布者、多个订阅者、事件通道和主题之间的交互

Java 消息服务 (JMS) 框架是 Java 应用程序编程接口 (API) 在 MOM 模型上的抽象。

在 EDA 中使用 JMS

Java 技术为 Java 程序提供 JMS 作为一种普通方法,用于创建、发送、接收和读取消息。JMS 是大多数消息传递系统中的常见概念和语义的接口和类抽象的框架。

通过 JMS 接口,消息生产者和消费者能够以点对点或发布/订阅模型发送和接收消息。下面的列表显示了 JMS 中的主要组件:

  • ConnectionFactory:该对象用于创建 JMS 连接
  • Connection:这是到 JMS 系统的连接
  • Destination:消息主题或消息队列的抽象
  • Session:发送或接收消息所在的上下文
  • MessageProducer:会话创建的组件,用于将消息发送到目的地
  • MessageConsumer:会话创建的组件,用于从目的地接收消息

使用 Geronimo 和 JMS 的简单事件框架

Apache Geronimo 与 Active MQ 开放源码消息提供程序绑定在一起。Active MQ 支持 JMS,因此为围绕 Geronimo 框架构建的应用程序提供一种方法来以充分利用 JMS 的消息传递功能。

以下各节定义了使用 Geronimo、Active MQ 和 JMS 的概念和语义构建的简单事件框架。在这些小节中定义的事件框架包括事件通道、事件发布者和事件接收者。事件通道负责注册和取消注册事件接收者,并负责将事件消息从事件发布者以匿名方式路由到事件接收者。此框架给出的惟一概念是事件通道功能,即根据事件对象实现的 Java 类或接口的类型,来过滤消息,并将消息路由到适当的接收者。

使用类/接口层次结构过滤并路由事件

典型的消息传递系统允许消息订阅者根据点号分隔的字符串(如 travel.flights.seatstravel.lodging.rates )来定义将接收的事件类型。本文中给出的事件框架还允许订阅者订阅特定类型的事件;不过,该事件类型由 Java 类和接口的层次结构定义。

事件框架可以表示为点号分隔的消息类型层次结构,如 图 4 所示的类层次结构。

图 4. 事件应用程序类关系
事件应用程序类关系
事件应用程序类关系

根据此图表,订阅 Event 接口所代表的事件的事件接收者将接收所有事件,而订阅 FlightEvent 接口所代表的事件的事件接收者将只接收基于该接口或 FlightDelayed 类或 SeatAvailable 类的事件。此设计允许事件接收者一次订阅多个事件类型。例如,事件接收者通过使用参数 Event.class 调用事件通道的 subscribe() 方法,可以订阅所有事件。如果添加新的事件类型,则事件接收者会在它们发布时自动接收它们。

事件通道通过确定事件接收者订阅的事件接口的最具体的子类型,来处理事件层次结构。例如,清单 1 所示的 FlightDelayed 类实现了 FlightEvent 接口;因此,事件通道将首先查找 FlightDelayed 类的订阅者,然后查找 FlightEvent 接口等,一直沿着类/接口层次结构向上。

清单 1. FlightDelayed 类
class TravelEvent extends Event {}
      class FlightEvent extends TravelEvent {}
      class LodgingEvent extends TravelEvent {}
      
      public class FlightDelayed
        implements FlightEvent
      {
        private String message = "";
      
        public FlightDelayed()
        {
        }
      
        public FlightDelayed(String message)
        {
          this.message = message;
        }
      
        public String getMessage()
        {
          return message;
        }
      
        public void setMessage(String message)
        {
          this.message = message;
        }
      }

事件通道

事件通道 是事件发布者用于发布事件,事件接收者用于订阅和接收事件的组件。简单事件通道的接口如 清单 2 所示。

清单 2. 简单事件通道的接口
public interface Channel
      {
        void start();
      
        void stop();
      
        void publish(final Event event);
      
        void subscribe(final Receiver receiver,
                       final Class eventClass);
      
        void unsubscribe(final Receiver receiver,
                         final Class eventClass);
      }

注意,subscribe()unsubscribe() 方法需要一个 Class 类型的参数,事件通道使用该参数来确定接收者将订阅或取消订阅哪些类型的事件。

为避免事件接收者轮询事件何时发生,可以通过 Receiver 接口的 receive() 方法调用事件接收者。每当将事件发布到事件通道时,都可以使用 Java 反射确定哪些订阅者将接收该事件。然后在这些对象上调用 receive() 方法。 清单 3 显示了一个简单事件接收者。

清单 3. 一个简单事件接收者的实现
public class EventReceiver
        implements Receiver
      {
        private static final transient Log log =
          LogFactory.getLog(EventReceiver.class);
      
        private String id = "";
      
        public EventReceiver()
        {
        }
      
        public EventReceiver(String id)
        {
          this.id = id;
        }
      
        public void setId(String id)
        {
          this.id = id;
        }
      
        public String getId()
        {
          return id;
        }
      
        public void receive(final Event event)
        {
          log.info("EventReceiver [" + id
                   + "] received event [" + event.getMessage() + "]");
        }
      }

清单 4 显示了事件通道的一个摘录。

清单 4. 事件通道的实现
public class EventChannel
        implements Channel
      {
        private static final String TOPIC_NAME =
          "java:comp/env/EventTopic";
      
        private static final String MQ_URL = "tcp://localhost:61616";
           
        private HashMap subscribers = new HashMap();
        private TopicConnectionFactory factory = null;
        private Topic eventTopic = null;
        private TopicConnection topicConn = null;
        private TopicSession topicSess = null;
        private TopicSubscriber topicSubscriber = null;
        private TopicPublisher topicPublisher = null;
        private EventConsumer eventConsumer = null;
      
        private void handleEvent(Event event)
        {
          final Set received = new HashSet();
      
          for (Class eventClass = event.getClass();
               Event.class.isAssignableFrom(eventClass);
               eventClass = eventClass.getSuperclass())
          {
            ArrayList receiverList = new ArrayList();
            getReceiversForEvent(getEventLeafInterface(eventClass),
                                 receiverList);
            Receiver[] receivers = new Receiver[receiverList.size()];
            receiverList.toArray(receivers);
            for (int i = 0; i < receivers.length; i++)
            {
              invokeOnce(received, receivers[i], event);
            }
          }
        }
      
        private void invokeOnce(Set received,
                                Receiver receiver,
                                Event event)
        {
          received.add(receiver);
          receiver.receive(event);
        }
        
        private Class getEventLeafInterface(Class cls)
        {
          Class retVal = null;
      
          if (Event.class.isAssignableFrom(cls))
          {
            retVal = cls;
            if (cls.isInterface())
            {
              return retVal;
            }
          }
      
          Class[] interfaces = cls.getInterfaces();
          if (interfaces != null)
          {
            for (int i = 0; i < interfaces.length; i++)
            {
              if (Event.class.isAssignableFrom(interfaces[i]))
              {
                retVal = interfaces[i];
                break;
              }
              retVal = getEventLeafInterface(interfaces[i]);
            }
          }
      
          return retVal;
        }
      
        public void start()
        {
          try
          {
            factory = new ActiveMQConnectionFactory(MQ_URL);
            topicConn = factory.createTopicConnection();
            topicSess =
              topicConn.createTopicSession(false,
                                           Session.AUTO_ACKNOWLEDGE);
            eventTopic = topicSess.createTopic(TOPIC_NAME);
            topicSubscriber = topicSess.createSubscriber(eventTopic);
            topicPublisher = topicSess.createPublisher(eventTopic);
      
            eventConsumer = new EventConsumer(this);
            Thread consumerThread = new Thread(eventConsumer);
            consumerThread.setDaemon(false);
            consumerThread.start();
          }
          catch (Exception e)
          {
            e.printStackTrace();
          }
        }
      
        public void stop()
        {
          // close topic connections, sessions, consumers, etc.
        }
      
        public void publish(final Event event)
        {
          try
          {
            ObjectMessage eventMessage = topicSess.createObjectMessage();
            eventMessage.setObject(event);
      
            topicPublisher.publish(eventMessage);
          }
          catch (Exception e)
          {
            e.printStackTrace();
          }
        }
      
        public void subscribe(final Receiver receiver,
                              final Class eventClass)
        {
          ArrayList receiverList = null;
      
          Class leafCls = getEventLeafInterface(eventClass);
      
          if (subscribers.get(leafCls) == null)
          {
            receiverList = new ArrayList();
            subscribers.put(leafCls, receiverList);
          }
          else
          {
            receiverList = (ArrayList) subscribers.get(leafCls);
          }
      
          if (receiverList.indexOf(receiver) < 0)
          {
            receiverList.add(receiver);
          }
        }
      
        public void unsubscribe(final Receiver receiver,
                                final Class eventClass)
        {
          Class leafCls = getEventLeafInterface(eventClass);
          if (subscribers.get(leafCls) != null)
          {
            ArrayList receiverList = (ArrayList) subscribers.get(leafCls);
            receiverList.remove(receiverList);
          }
        }
      }

注意:EventChannel 类的完整源代码可从本文末尾的 下载 部分通过下载获得。

清单 5 显示事件消费者的实现摘录。

清单 5. 事件消费者的实现
class EventConsumer
       implements Runnable, ExceptionListener
     {
       private boolean running = false;
       private boolean stopped = true;
       private EventChannel eventChannel = null;
   
       private EventConsumer(EventChannel eventChannel)
       {
         this.eventChannel = eventChannel;
       }
   
       public void run()
       {
         log.info("Event Consumer started");
   
         // Create a Topic Connection, Session, and a MessageConsumer for the Topic
         // loop until stopped and distribute events to the event channel
         // using the handleEvent method  
         eventChannel.handleEvent(event);
                 
         stopped = true;
   
         log.info("Event Consumer stopped");
       }
   
       public void shutdown()
       {
         running = false;
   
         while (stopped == false)
         {
           Thread.yield();
         }
       }
     }

注意:EventConsumer 类的完整源代码可以从本文末尾的 下载 部分通过下载获得。

在 Geronimo 中部署和运行事件框架

事件框架使用部署在 Geronimo 中的 Web 应用程序来测试每个事件类型。除了事件框架外,Web 应用程序还包括一个用于输入事件消息的 HTML 表单和一个用于接收 HTTP 请求并将内容分派到事件通道的 servlet。

HTML 表单(如 图 5 所示)只允许将三种类型的事件消息发送到分派 servlet。

图 5. Web 应用程序的开始屏幕
Web 应用程序的开始屏幕

事件分派 servlet 实例化事件通道对象和三个示例事件接收者。事件接收者然后订阅给定的事件,servlet 将事件发布给事件通道对象。清单 6 显示了该 servlet。

清单 6. 分派 servlet 的实现
public class SenderServlet extends HttpServlet
      {
        private EventChannel eventChannel = null;
        private EventReceiver allTravelEventReceiver = null;
        private EventReceiver flightEventReceiver = null;
        private EventReceiver lodgingEventReceiver = null;
      
        public void init()
          throws ServletException
        {
          super.init();
      
          eventChannel = new EventChannel();
          eventChannel.start();
      
          // create event receivers
          allTravelEventReceiver =
            new EventReceiver("allTravelEventReceiver");
          flightEventReceiver =
            new EventReceiver("flightEventReceiver");
          lodgingEventReceiver =
            new EventReceiver("lodgingEventReceiver");
      
          // subscribe to all Travel events
          eventChannel.subscribe(allTravelEventReceiver,
                                 TravelEvent.class);
      
          // subscribe to Flight events
          eventChannel.subscribe(flightEventReceiver,
                                 FlightEvent.class);
      
          // subscribe to Lodging events
          eventChannel.subscribe(lodgingEventReceiver,
                                 LodgingEvent.class);
        }
      
        public void destroy()
        {
          super.destroy();
      
            // unsubscribe all event receivers and stop the event channel
        }
      
        public void doGet(HttpServletRequest req, HttpServletResponse res)
          throws IOException, ServletException
        {
          // respond with input form
        }
      
        public void doPost(HttpServletRequest req, HttpServletResponse res)
          throws IOException, ServletException
        {
          String txtMsg = req.getParameter("txtMsg");
          if (txtMsg != null && txtMsg.length() > 0)
          {
            String flightDelayed = req.getParameter("FlightDelayed");
            String rateIncreased = req.getParameter("RateIncreased");
            String seatAvailable = req.getParameter("SeatAvailable");
      
            if (flightDelayed != null)
            {
              // send a Flight event
              eventChannel.publish(new FlightDelayed(txtMsg));
            }
            else if (rateIncreased != null)
            {
              // send a Lodging event
              eventChannel.publish(new RateIncreased(txtMsg));
            }
            else if (seatAvailable != null)
            {
              // send a Flight event
              eventChannel.publish(new SeatAvailable(txtMsg));
            }
          }
      
          doGet(req, res);
        }
      }

注意:SenderServlet 类的完整源代码可以从本文末尾的 下载 部分通过下载获得。

事件分派 servlet 调用的事件框架的应用程序流程如 图 6 所示。

图 6. 事件 Web 应用程序的顺序
事件 Web 应用程序的顺序
事件 Web 应用程序的顺序

部署应用程序

事件框架的类和 Web 应用程序打包在 .war 文件中,并放置在 GERONIMO_HOME/deploy 目录下。对于创建并复制到 deploy 目录下的 .war 文件,Geronimo 在启动时会自动部署它。放置在 deploy 目录下的应用程序是热加载的,当发生更改时,Geronimo 能够在运行时重新加载应用程序。这使调试应用程序变得非常便利。

运行应用程序

您可以使用位于 GERONIMO_HOME/bin 目录下的启动脚本(startup.bat 或 startup.sh)启动 Geronimo 应用服务器。当调用 Geronimo 启动脚本时,会出现 Geronimo 控制台窗口。对于部署的事件框架的 Web 应用程序,启动时出现的 Geronimo 控制台窗口将包含类似于 清单 7 所示的行,确认 Web 应用程序已成功启动。

清单 7. Web 应用程序的成功启动
00:12:33,921 INFO  [EventChannel] Starting EventChannel...
00:12:33,937 INFO  [EventChannel] Creating topic connection...
00:12:35,062 INFO  [EventChannel] EventChannel started
00:12:35,062 INFO  [EventChannel] Event Consumer started
00:12:35,093 INFO  [SenderServlet] AllTravelEventReceiver
  [com.jeffhanson.eda.EventReceiver@f84033]
00:12:35,093 INFO  [SenderServlet] FlightEventReceiver
  [com.jeffhanson.eda.EventReceiver@3ee73b]
00:12:35,093 INFO  [SenderServlet] LodgingEventReceiver
  [com.jeffhanson.eda.EventReceiver@16127f4]

将事件发送给 servlet 后,servlet 会将它发布给事件通道。如果将包含文本 Flight 2365 to Detroit will be delayed 15 minutesFlight-Delayed 消息发送给 servlet,则 Geronimo 控制台窗口会显示 类似于 清单 8 的信息。

清单 8. 成功的事件发布
00:12:53,718 INFO  [SenderServlet] >>>>>
00:12:53,718 INFO  [SenderServlet] >>>>>
00:12:53,734 INFO  [EventChannel] Publishing event
  [com.jeffhanson.eda.events.business.FlightDelayed@863854]
00:12:53,859 INFO  [EventReceiver] EventReceiver [flightEventReceiver]
  received event [Flight 2365 to Detroit will be delayed 15 minutes]
00:12:53,859 INFO  [EventReceiver] EventReceiver [allTravelEventReceiver]
  received event [Flight 2365 to Detroit will be delayed 15 minutes]

结束语

设计能够对实时更改和事件作出及时响应的有效事件驱动软件系统是一项复杂工作。结合使用 SOA 与使用 Java 反射的有效事件驱动的交互框架可以减少复杂性,并增加灵活性。Geronimo 平台提供了 API 和工具(包括 JMS 提供程序),可以用来构建功能强大的事件驱动的交互框架。

从线性企业编程转移到面向服务的设计只能带来适用于 SOA 模型的优势。重构系统以获得业务服务会导致服务和组件的模块化框架。如果服务基础设施的交互模型敏捷而可扩展,那么您可以将这些组件重用于多种不同的应用程序。SOA、EDA 和 Apache Geronimo 为功能强大的有效软件基础设施提供了基础。


下载资源


相关主题

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Open source, Java technology
ArticleID=163806
ArticleTitle=使用 Apache Geronimo 和 JMS 构建事件驱动的框架
publish-date=09282006