使用普及消息传递和 IBM Lotus Expeditor 微代理(micro broker)实现端到端的集成

Comments

面向服务的架构(SOA)的一个基本要求是连接。通过 IBM WebSphere MQ 和 IBM WebSphereMessage Broker 之类的产品可以实现企业级应用程序和服务之间的连接。将 IBM Lotus Expeditor 微代理(Lotus Expeditor V6.1 的一个新特性)添加到解决方案中即可连接到移动设备和普及性设备,从而在 SOA 中实现真正的端到端集成。本文介绍了消息传递和 Lotus Expeditor 微代理的关键概念,以及使用这两种特性的益处。了解如何设计业务集成解决方案,将集成层扩展到企业的移动设备或嵌入式设备的极致。

消息传递简介

IBM WebSphere MQ 之类的消息传递中间件软件提供了连网应用程序和服务之间的连接。中间件层为开发健壮的相互通信的应用程序提供了一个平台,与通信的具体实现方式分开考虑。因此,中间件层更注重考虑应用程序的核心业务功能。中间件改进了应用程序和组件之间的松散耦合,从而得到更为灵活的架构。

在消息传递中间件中,通过使用一个名为消息代理(或队列管理程序)的服务器组件以消息的形式在系统间传递数据来实现连接。消息是应用程序数据包,通常包括数据有效负载和消息的可配置的报头属性。消息代理或队列管理程序根据消息中描述的特定目的地确定接收消息的客户机,从而实现消息传递。消息传递可以同步,即客户机应用程序显式地请求下一条消息(客户机请求),也可以异步,即在匹配客户机设定时由代理发送消息(服务器发送)。在消息传递系统中使用以下两个范例来描述目的地:

  • 点对点消息传递范例
  • 发布-订阅消息传递范例

点对点消息传递

点对点消息传递是实现两个应用程序间的一对一传递语义的一种方法。点对点消息传递中将目的地定义为队列。简言之,一个应用程序将消息放入代理或队列管理程序中的给定队列中,而另一个应用程序从队列中读取该消息,如图 1 所示。发送方将消息传递给接收方应用程序时使用先进先出(FIFO)机制。

图 1. 使用点对点消息传递范例的消息传递
使用点对点消息传递范例的消息传递
使用点对点消息传递范例的消息传递

零售渠道可看作点对点消息传递范例的一个例子,其中由商店创建订单队列,在商店的仓库处依次从队列提取订单,从而进行发货。

发布-订阅消息传递

发布-订阅消息传递是针对一条特定消息在应用程序间实现一对多传递语义的消息传递方式,如图 2 所示。发布-订阅消息的目的地称为主题。主题提供了分层的命名结构,因此应用程序(订阅方)可以灵活地进行注册。主题是一个字符字符串,层次结构的每一层都以某种形式分界(通常使用正斜杠),例如 stocks/IBM。发送消息时,由发送方指定目标主题。代理为所有使用此主题的订阅方匹配一条入站消息,并将该消息的副本发送给每个匹配的订阅方。如果发布方和订阅方指定的是完全相同的主题名称,则可能是精确匹配,否则订阅方可以指定通配符来匹配很多不同的发布主题。例如,订阅 stocks/+ 的订阅方可以收到指定为两层层次结构的第一层为 stocks 的任何主题的消息(因此发布给 stocks/IBM 和 stocks/MyCorp 的消息都可以匹配)。

图 2. 使用发布-订阅消息传递范例的消息传递
使用发布-订阅范例的消息传递
使用发布-订阅范例的消息传递

发布-订阅消息传递范例特别适合于事件驱动的系统,其中有很多其他的应用程序响应某个给定的触发器,比如传感器发送读取(比如温度)的请求,然后各种显示设备和后端系统接收结果的通知。

服务质量和持久性

在消息传递中,服务质量主要涉及应用程序间传递数据的方法和遭遇某种故障时应用程序数据的恢复能力。选择合适的服务质量通常需要在以下两个方面权衡:数据的易变性和相对重要性,以及实现期望的服务质量所消耗的系统资源数量。高服务质量通常要求在应用程序客户机和消息代理 之间执行一系列的交换来尽可能地保证只传递一次消息。低服务质量要求客户机和消息代理之间交换次数通常较少(相应地可在资源和网络利用方面受益),但是它们在消息传递方面不能提供较高的质量保证,比如消息重复和消息丢失的几率有所增加。

消息的持久性也存在相似的权衡;主要涉及消息抵达消息代理后的寿命。持久性消息会写入磁盘,因此如果消息代理因为某种原因发生崩溃,则可以恢复这些消息。非持久性消息只存在于消息代理的内存中,出现崩溃时很可能丢失这些消息。与服务质量一样,需要在某种系统资源(本例中是磁盘)和针对消息代理出现问题时恢复消息所需的磁盘 I/O 操作带来的延迟之间进行权衡。财务应用程序鉴于其数据的重要性通常使用高服务质量和持久性消息。如果应用程序数据更新较为频繁,就每条消息而言相对价值较低,并且/或者得到了大量部署,则这类应用程序可以使用较低的服务质量。互联网上的体育记分板就是此类应用程序的一个例子。

消息传递客户机、JMS 和 JNDI

应用程序为了能够使用消息代理进行通信,使用了消息传递客户机库。消息传递中间件供应商通常提供了各种客户机库来支持特定的应用程序平台。某些客户机库提供了特定于供应商的专有应用程序编程接口(API)。消息传递客户机库通常将连接的意图公开给消息代理和多个相关的动作,比如发送、接收、连接和断开连接。客户机库还经常包含了一些用于定义中间件实体(例如消息、队列和主题)的消息专有的数据类型;另外还包含了一些异常处理例程,使应用程序能够处理发生故障的场景。

J2EE 为 Java 定义了 Java Messaging Service (JMS) 标准,从而指定了一组标准的 Java 接口和行为语义,供应商可以根据这些接口和语义开发 Java 客户机库。此标准使应用程序能够被写到标准的 JMS 接口,实现客户机的某种行为而无需使用任何特定的软件供应商的产品。如果 JMS 提供者支持使用 Java Naming and Directory Interface (JNDI) 提供者,则可以进一步解除应用程序和它的受管理的运行时环境之间的耦合,通过将指定的消息传递资源存储为 JNDI 存储库中的受管理对象(而不是将引用硬编码为代码中的队列和主题的应用程序)即可实现此目的。 这样,如果 J2EE 应用程序的管理员更改了队列名称,那么只要受管理对象的存储名称与 JNDI 存储库中名称相同,就不需要更改应用程序。很多企业的消息传递供应商都支持这种功能,包括 IBM WebSphere MQ、IBM WebSphere Process Server 和 IBM WebSphere Enterprise Server Bus,以及 IBM Lotus Expeditor 微代理。

普及消息传递和微代理

一般说来,消息传递中间件旨在部署在企业的基础设施上,后者通常指大量的大型服务器和充裕的网络带宽的结合。这就意味着企业应用程序可以处理大规模应用程序(如银行系统)的要求。自 20 世纪 90 年代中后期以来,普及计算的现象表明在移动设备(如 PDA 和移动电话)上和非计算领域(如汽车和家用电器)中逐渐能够使用计算功能。由于这些固态设备的功能得到了增强,因此它们更有可能参与到企业面向服务的架构(SOA)。普及消息传递使人、应用程序和设备能够共享信息并且彼此响应,而不论其相对位置如何。

Supervisory Control and Data Acquisition (SCADA) 和遥感勘测应用程序是设备间通信的很好示例,解释了如何获取来自某些遥远位置(如电站、炼油厂等)的信息以及如何将这些信息传入企业的后端,以便根据现场传回的信息作出更好、更及时的决策。设备间通信还有助于企业内部作出决策来影响远程位置的更改(如打开或关闭阀门或停止设备运行)。普及消息传递将人与流动人员使用的设备联系起来,其中的流动人员并不指办公室人员,并且使用的是移动设备而非台式机或笔记本。这样的示例有调度驱动程序所使用的传递记录应用程序,来自现场销售代表的订单输入系统等。

普及域给那些希望为此类场景提供解决方案的人们提出了很多挑战。首先,设备仅仅是设备,而不是专门的计算机。其主要作用不是计算,并且通常对其物理规模有严格的要求(与手持设备一样)。这就自然地限制了其作为计算平台的能力;计算资源容易收到处理能力、内存和持久性存储的限制。另外,在很多情况下只能通过缓慢、不可靠或是昂贵(常常兼而有之)的网络连接才能连接到设备。普及消息传递提供了针对此域的特殊要求进行优化的中间件功能。Lotus Expeditor 微代理器的目标是通过为普及域提供以下的特殊功能来满足上述场景的需求:

  • 一个小型的内存占用消息代理(微代理)用作资源受限环境中的集成中心,并链接到企业后端
  • 应用程序客户机库旨在使用了经过优化的普及消息传递协议的各种平台之上运行,这些平台包括 J2ME、J2SE 、J2EE(JMS 和 JNDI)
  • 到企业消息传递服务器后端的桥接

IBM Lotus Expeditor 微代理

顾名思义,IBM Lotus Expeditor 微代理实际上是发布-订阅消息代理服务器的一个内存占用很少的实现,与企业的消息代理(如 IBM WebSphere Message Broker)相比,微代理被设计为在设备上运行,而不是在大规模的服务器基础设施上运行。微代理使用 Java 实现,并且在 Lotus Expeditor 和 J2SE Java 运行时所提供的台式机和设备运行时上运行。客户机应用程序可以通过 TCP/IP 远程连接到微代理或在同一个 Java 运行时中连接到微代理。Lotus Expeditor Toolkit 提供了用于微代理客户机应用程序开发和测试的开发平台和工具集。

微代理自身可以独立模式运行,也可以桥接到企业的消息传递服务器中。在这个桥接中,用户可以定义转换逻辑来修改(或者甚至抛弃)远端和企业之间的消息流。

微代理的典型使用模式包括:

  • 作为远端站点上的本地集成中心,使现场应用程序能够轻松地以一种松散耦合的形式进行通信,不需要在应用程序间具备专有的设备接口。
  • 作为收集企业最远端的数据并选择性地将其传递到后端的方法。这样的示例包括 Radio-Frequency ID (RFID) 场景,其中通过阅读器的每个标记都可能生成几个读取事件,而只有一个事件应该与企业通信。
  • 作为进程间通信组件,在给定的 Java 运行时中或几个 Java 和非 Java 运行时之间。

微代理及其相关的组件构建在 OSGI 技术之上,并且旨在 OSGI 运行时之中运行。这个开放式平台改进了面向服务的模型以便开发设备应用程序。微代理及其客户机库包装在一组 OSGI 包中,使客户机和管理应用程序能够使用模块化的 OSGI 框架。

管理微代理

微代理使用编程式的 Java 编程接口进行管理。管理接口使用户能够查询给定的微代理实例及其连接的客户机的各个方面。用户可以获得微代理内部的面向对象的表示(比如发布-订阅匹配引擎),并可以进行信息查询,比如连接客户机的数量和收发消息的数量。

微代理的核心包将 BrokerFactory 服务公开给 OSGI 运行时,这些包可以使用 OSGI Service Tracker 对 OSGI 运行时进行跟踪。清单 1 给出了一个示例来说明如何实现该处理。

清单 1. 使用微代理管理 API
public class TestActivator implements BundleActivator, ServiceTrackerCustomizer {

	ServiceTracker tracker = null;
	BundleContext bundleContext = null;
	
	public void start(BundleContext context) throws Exception {
		// Keep the reference to the bundle context
		bundleContext = context;
		// Track for the Broker Factory service
tracker = new ServiceTracker(context,
“com.ibm.micro.admin.BrokerFactory",
this);
// Will call us back on addingService()
		tracker.open();
	}

	…

	public Object addingService(ServiceReference reference) {
		BrokerFactory factory = 
			(BrokerFactory)bundleContext.getService(reference);
		try {
			// Get a reference to the TestBroker instance
			// by name as we are in the same JVM
			LocalBroker broker = factory.getByName("TestBroker");
			// Simply print out the number of bytes received
			// by the broker
			System.out.println(broker.getBytesReceived());

			// We could also get by address for a remote broker
			RemoteBroker remoteBroker = 
factory.getByAddress("tcp://localhost:1883");
			remoteBroker.logon("Admin", "password"); 
// Simply spit out the number of connected clients
			System.out.println(remoteBroker.getCommunications().
getNumberOfClients());
			remoteBroker.logoff();

		} catch (AdminException e) {
			e.printStackTrace();
		}						
		return f;
	}

}

微代理客户机

微代理提供了两个客户机库,一个 JMS 和一个用于专家设备域的专有客户机。这两个客户机都支持发布-订阅消息传递范例。

建议使用 JMS 客户机开发 Lotus Expeditor 环境下的应用程序。微代理的 JMS 客户机实现支持 JNDI 管理模型,因此,受其管理的对象可作为 JNDI JMS 提供者集成到 J2EE 运行时中。微代理的 JMS 客户机支持 JMS 的发布-订阅域时存在以下限制:

  • 仅支持 JMS Text 和 Bytes 消息。
  • 不支持分布式(XA)事务处理。

按照 JMS 规范,微代理的 JMS 客户机允许消息传递应用程序通过一个侦听程序同步或异步地从微代理接收消息。微代理 JMS 客户机支持 JMS 中定义的持久性和非持久性服务质量。清单 2 给出了一个简单的微代理 JMS 客户机应用程序。

清单 2. 一个使用 JMS API 的微代理客户机应用程序例子
JmsFactoryFactory jmsFactory = JmsFactoryFactory
	.getInstance(MQTTConstants.PROVIDER_NAME);

// Create a micro broker topic connection factory
TopicConnectionFactory connFactory = jmsFactory
	.createTopicConnectionFactory();
// Set the URI telling the factory how to connect to the server
((JmsConnectionFactory) connFactory).setStringProperty(
		MQTTConstants.MQTT_CONNECTION_URL,
		MQTTConstants.MQTT_TCP_SCHEMA + "127.0.0.1:1883");
// Create a connection object
TopicConnection conn = connFactory.createTopicConnection();
// Must set a client ID
conn.setClientID("jmsClient");
// Start the connection (connect).
conn.start();
// Create a transacted JMS session. A transacted session means
// we have to explicitly commit any sent or received messages.
TopicSession session = conn.createTopicSession(true,
		Session.SESSION_TRANSACTED);
Topic topic = session.createTopic("test");

// Create a message consumer to receive messages on the topic 
// we just created
MessageConsumer consumer = session.createConsumer(topic);

// Send 10 text messages and receive them using our consumer
for (int i = 0; i < 10; i++) {
	TextMessage message = session.
createTextMessage("Test " + (i + 1));
	MessageProducer producer = session.createProducer(topic);
producer.send(message);
	session.commit();
	producer.close();
			
	TextMessage r = (TextMessage) consumer.receive();
	session.commit();

	System.out.println("Received: " + r.getText());
}

// Close the consumer and session
consumer.close();
session.close();
		
// Now tidy up the connection
conn.close();

微代理支持专家级的普及消息传递 MQTT 协议规范,并提供专有的 Java MQTT 消息传递客户机。有关 MQTT 规范的更多信息,请参见 Web 站点 http://www.mqtt.org。此客户机可以在专家设备域中使用。MQTT 客户机的语义比 JMS 简单很多,因为没有会话、用户、生产者或事务处理的应用程序控制这样的概念。应用程序消息通过一个简单的回调机制异步传递,并且只以字节数组有效负载的格式提供。MQTT 消息也没有 JMS 的那些用户自定义的报头属性的概念。MQTT 客户机编程模型还包含了一个通知方法作为其回调机制的组件,向应用程序客户机表明到代理的连接已经丢失,因此应用程序可以决定是否要重新连接。

MQTT 客户机支持 MQTT 协议规范为向代理发送消息而定义的三种服务质量:

  • 0 – 传递消息至多一次并且消息不持久保存在代理中的磁盘上。
  • 1 – 传递消息至少一次(即可能重复传递)并且将消息持久保存在代理中的磁盘上。
  • 2 – 传递消息一次且仅一次,并且将消息持久保存在代理中的磁盘上。

注意,订阅 MQTT 客户机指定了每次订阅的接收服务质量 —— 当发布与订阅方相匹配且订阅的服务质量级别较低时,消息被降级到订阅的服务质量。清单 3 给出了一个简单的 MQTT 应用程序示例。

清单 3. 一个使用 MQTT 客户机的微代理客户机应用程序示例
// Set any specific connection properties using a properties object
MqttProperties mqttProps = new MqttProperties();
// Stateless client -- clear up any persisted data for the client
mqttProps.setCleanStart( true );
// Create the client from the factory
// The client id for this client is "testClient" and the URL in the 
// second
// parameter describes the location of the remote broker. 
// In this case we are
// connecting over TCP/IP to a broker on the host "mybroker" on 
// port 1883.
MqttClient client = MqttClientFactory.INSTANCE.createMqttClient("testClient", 
				brokerURL, mqttProps);
// Connect to the broker
client.connect();
// Publish a message to the a/b topic at QoS 2
client.publish("a/b", new MqttPayload(("Hello World").getBytes(),0), (byte) 2 , false);
// Then disconnect
client.disconnect();

注:MQTT 协议也受 IBM WebSphere Message Broker V6 的支持。

微代理桥接器

微代理桥接器是一个关键的组件,提供了在普及消息传递域边界和企业后端之间的链接。如图 3 所示,微代理桥接器提供了连接到远端微代理的三种方式:

  • 通过 MQTT 协议连接到另一个微代理或 WebSphere Message Broker 服务器
  • 通过 JMS 连接到 WebSphere MQ 队列管理程序
  • 使用由 JNDI 管理的对象连接到第三方 JMS 提供者。
图 3. 一个 微代理桥接器拓扑示例
一个微代理桥接器拓扑示例
一个微代理桥接器拓扑示例

该桥接器定义了以下的概念来将微代理链接到远端的对等组件:

  • 管道表示微代理和它的对等组件之间的双向连接。
  • 端点专有的连接,比如 JNDI 存储库中的 JNDI 连接工厂。该连接与一个给定的管道相关联。
  • 流表示微代理和它的远端对等组件之间的消息流。管道可以定义入站流(接收消息到微代理中)和出站流(从微代理向外发送消息)。每个流定义了一组源目的地(例如,一组主题和队列)和一个目标目的地(消息将发往该目的地)。
  • 可以对每个流定义一系列转换。这些转换可以在通过桥接器传递消息时修改或抛弃消息。

桥接器使微代理可以使用多种拓扑结构进行部署,比如一组相互链接的 微代理或本地站点和远程后端之间的一个简单链接。修改和抛弃消息的能力提供了一个功能强大的机制,微代理可以使用这种机制将本地应用程序逻辑和要求与企业后端隔离开来,从而只发送回相关的信息,发送格式应尽可能地便于企业消息传递服务器使用。

使用微代理的管理编程接口配置该桥接器。可以定义一个通过 MQTT 连接到另一个微代理的管道,如清单 4 所示,使用微代理管理接口。

清单 4. 使用微代理管理 API 配置桥接器
// Obtain a reference to the bridge from the administrative broker 
// object
LocalBroker broker = factory.getByName(“TestBroker”);
Bridge bridge = broker.getBridge();

// Create a definition for a Pipe		
PipeDefinition pipeDef = bridge.createPipeDefinition("anMQTTPipe");
	
// Connection definitions describe the endpoint of the pipe
// and are target-specific.	
MQTTConnectionDefinition swtest = 
bridge.createMQTTConnectionDefinition("swtest");
swtest.setHost("mybrokerserver.com");
swtest.setPort(1883);
pipeDef.setConnection(swtest);

// Create a message flow over the pipe
FlowDefinition flow1 = bridge.createFlowDefinition("flow1");
// This will set the source to a topic named “outbound” – this is where
// messages will be consumed from.
flow1.setSources(new 
TopicDefinition[]{ bridge.createTopicDefinition("outbound") });

// Set the target (i.e. where messages will be sent to) to a topic
// named “inbound” on the remote broker
flow1.setTarget(bridge.createTopicDefinition("inbound"));

// Sets the flow as the outbound flow to send messages out of the given
// micro broker into a remotely bridged broker as described above.
pipeDef.addOutboundFlow(flow1);

// Add the pipe to the bridge 
bridge.addPipe(pipeDef);
// Start all the configured pipes.
bridge.startAllPipes();

结束语

消息传递中间件提供了一个洁净、分离的平台,用于在应用程序服务和组件之间进行健壮可靠的相互通信。在企业消息传递为大规模的后端系统提供此功能的场景中,普及消息传递通过提供专家协议和组件来链接远端设备和企业,从而拓宽企业系统的作用范围。Lotus Expeditor 微代理及其客户机提供了一个可部署在资源受限的普及设备中、并且可以利用桥接器功能往回连接到企业的集成中心。在桥接中,可以修改(甚至抛弃)消息从而可以以最有效率的方式执行到企业的链接。这样,Lotus Expeditor 微代理和企业消息传递基础设施的结合使得我们能够在 SOA 解决方案中实现真正的端到端集成。


相关主题


评论

添加或订阅评论,请先登录注册

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Lotus, WebSphere
ArticleID=326552
ArticleTitle=使用普及消息传递和 IBM Lotus Expeditor 微代理(micro broker)实现端到端的集成
publish-date=07072008