IBM WebSphere Cast Iron 与 WebSphere MQ Telemetry Transport 协作实现业务消息推送

计算、移动互联网、物联网是当前最炙手可热的几个关键词,也是未来最具发展潜力的几个关键技术。云计算可以为人们提供强大的计算能力和存储能力,能够有效地解决移动设备计算能力不足和存储量小的局限性,然而实现这一切的前提是拥有良好的网络环境。基于物联网的 WebSphere MQ Telemetry Transport(简称 MQTT)相关技术在云计算和移动设备之间架起一道桥梁,在低带宽和不稳定的移动互联网中为您提供可靠的网络服务。

李 开林, 软件工程师, IBM

李开林,北京理工大学研究生,曾参与过 IBM Extreme Blue 项目实习。对于 Web 网络应用、系统整合和业务集成兴趣浓厚,近期致力于移动平台业务扩展的研究工作。



梁 荣达, 软件工程师, IBM

梁荣达,IBM 中国开发中心软件工程师,从事 MQTT 相关开发工作。曾从事 WebSphere Adapter 的开发与支持工作,熟悉企业应用开发与集成。目前关注物联网、Web 应用及移动开发等领域。



金 千里, 开发经理, IBM

金千里, BM CDL 开发经理。他多年来一直致力于应用程序集成软件的开发,并精通应用程序集成、连接解决方案、面向服务架构和其他相关技术。他撰写了一本图书、一本 IBM 红皮书和多篇 developerWorks 文章。



2012 年 6 月 21 日

引言

云计算、移动互联网、物联网是当前最炙手可热的几个关键词,也是未来最具发展潜力的几个关键技术。云计算可以为人们提供强大的计算能力和存储能力,能够有效地解决移动设备计算能力不足和存储量小的局限性,然而实现这一切的前提是拥有良好的网络环境,包括稳定的链接和高速的传输条件。然而当前移动互联网正处于起步阶段,无法提供可靠的网络保障,基于物联网的 WebSphere MQ Telemetry Transport(简称 MQTT)相关技术则恰好可以很好地弥补这一方面的缺陷,在云计算和移动设备之间架起一道桥梁,在低带宽和不稳定的网络环境中为您提供可靠的网络服务。本文将以 Smart Flight 一个典型的场景为例,详细向您介绍如何使用基于物联网的 MQTT 相关技术,使得基于云计算和移动互联网的应用更加完美。


Cast Iron 简介

IBM WebSphere Cast Iron(简称 Cast Iron)是 WebSphere 产品家族成员,它能够帮助企业完成内部应用系统与外部 SaaS 以及云应用的集成。 Cast Iron 平台提供了一种“只需配置,无需编程”的方式来帮助用户完成业务集成,从而使业务开发和集成这一过程既快速又便捷,而且极大地降低了成本。

Cast Iron 在业务集成方面有着独特的优势,用户通过 Cast Iron Studio 创建配置业务应用流程,具体包括实现与企业应用的连接、在不同应用之间进行数据的转换、采用图形化的方式定义业务逻辑; 同时 Cast Iron 还提供了安全稳定的集成项目运行环境,支持三种不同的部署方式; 此外用户还可以方便的通过控制台实现对已部署应用项目的管理。


MQTT 简介

WebSphere MQ Telemetry Transport (MQTT) 是一项为物联网而设计的消息传递技术,由 IBM® Hursley Laboratory 开发。它是一种开放、精简、轻量级和容易实现的协议。

物联网,即 Internet of Things, 简称 IoT。它是在互联网基础上的延伸和扩展的网络,用户端从传统的计算机延伸和扩展到了任何物品与物品之间,物品通过嵌入的传感器进行信息采集,然后通过小型计算设备进行网络信息交换与通信。物联网技术是当前的技术热点,在能源、电子信息、医疗、交通、零售、物流、工业制造等行业都有相当良好的应用前景。

在物联网中,MQTT 协议与相关产品负责把数据由传感器有效的传送到服务器,完成在受限、不稳定网络到因特网或企业网络的连接,实现两者互联互通。在此基础上,互通的物品不仅能通过设备采集信息、实现智能的感知,更能结合先进的信息处理、数据挖掘、人工智能等技术手段,与业务应用整合,实现从后台到前端设备的智能监控,完成进一步的信息化工作。

如上所述,MQTT 协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

  1. 非常小的通信开销(最小的消息大小为 2 字节);
  2. 支持各种流行编程语言(包括 C,Java,Ruby,Python 等等)且易于使用的客户端;
  3. 支持发布 / 预定模型,简化应用程序的开发;
  4. 提供三种不同消息传递等级,让消息能按需到达目的地,适应在不稳定工作的网络传输需求

二者结合的价值

在当前的社会环境中,实际应用并不是由单个项目能够独立实现,需要由众多的网络服务合作完成。面对复杂的网络环境,业务集成与协作是不容忽视的重要因素,而这正是 Cast Iron 强大的集成能力发挥作用的场景。通过各种各样的 Connector,Cast Iron 可以将不同的系统和服务快速方便地集成到一起,通过简单的拖拽完成业务流程配置,同时利用本身所基于的云计算平台进行大规模的运算为用户提供更加友好的服务。

在传统互联网中,我们常常并不是特别在意网络状态和数据传输量,但是在特别情况下,我们就格外关心数据传输的可靠性。比如在银行交易过程中,对于提交的数据能够准确无误地到达目的地,对于我们来说是至关重要的,无论是数据异常丢失或损坏,还是多次重复提交和接收都会给我们带来致命的错误,因此不可靠的网络会造成极大程度的危险,引起数据的严重不平衡。在通常情况下我们往往需要采用编写大量的辅助代码的方式来预防这种情况的发生,这样给开发人员和测试人员造成了极大的工作量,而且还有潜在风险的可能性。

采用 MQTT 技术实现了应用程序和消息传递的解耦,在几乎所有的业务应用中都可以使用,同时可以极大程度上减轻开发人员的工作量,尤其是 MQTT 为我们提供的三种级别的传输服务:

  • 至多一次: 消息发布完全依赖于底层 TCP/IP 网络,会发生消息丢失或重复,这种级别可以用于如下情况,数据丢失一次无所谓,因为不久就会发送第二次数据
  • 至少一次:确保消息准确到达,但是消息重复可能会发生
  • 只有一次:确保消息到达,而且仅到达一次。这种级别非常适合于如下情况,在计费等系统中消息重复或丢失会导致不正确的结果

三种级别的传输服务几乎可以涵盖现实应用中所有的网络服务的要求,当然不同级别的服务对于网络和硬件的开销有所不同,可以根据实际项目的需求进行合理选择。

通过上述分析可以发现,Cast Iron 与 MQTT 的完美融合将会给我们创造极大的价值,不仅仅使我们的业务项目开发更加的简单、快捷,极大地降低我们的项目开发成本,而且我们的项目开发还可以使用简单并且可靠的数据传输方式,将业务应用以合理的方式推广到移动平台。

在传统互联网平台上,也许我们并不担心网络带宽和数据传输速度,甚至是终端的运算能力,但是针对于个别特殊的场景,则需要二者相互配合来保证数据能够及时准确地到达,同时有且仅有一次到达目的地,确保项目数据的正确性和业务运作的完整性;而在移动互联网或者是一些受限的网络环境的应用程序中,确保数据传输量的最小化,可靠地消息推送的模式,有效地保障数据及时并准确地到达目的地是相当重要的,Cast Iron 和 MQTT 的结合替代了传统的轮询模式实现消息推送,极大地减少了网络资源的消耗和浪费,同时也不再需要始终保持与服务器的连接,不需要再担心网络环境的可靠性和移动设备电量不足等问题。


示例场景描述

将 Cast Iron 和 MQTT 两者优势互补无论是在业务层面,还是在系统集成和开发层面都有着无限的价值。下面我们将以 Smart Flight 场景为例,向您详细介绍如何使 Cast Iron 和 MQTT 进行相互配合,实现数据的可靠传输,并将业务数据推送到 Android 移动平台上。

在现实生活中,用户需要乘坐飞机外出时,常常会被很多的意外因素所困扰。比如在准备赶往飞机场时,常常会害怕堵车而延误自己的行程;或者在机场候机大厅里时,也往往会担心飞机误点浪费自己的时间等等。种种因素都会破坏用户旅行的心情,下面我们就将以此场景为基础,使用 Cast Iron 和 MQTT 实现一个简单的小工具来帮助用户从这些困扰中解脱出来。

需要说明一下的是在本场景的实现中,我们假定用户的基本信息和航班信息都已经在系统中完成配置,我们在实现过程中并不需要考虑这些因素。业务规则是这样的,用户原定计划是正常情况下,早上 6:00 开始准备出发赶往飞机场。用户期望我们的系统能够在 5:00 时开始对路况进行监控,一旦发现交通拥堵或者其他紧急情况造成的路况不良,用户将会花费比正常情况下更多的时间在路上的时候,系统需要及时自动地推送消息到 Android 移动平台,提醒用户提前准备出发;而如果 5:00 到 6:00 这段时间路况一直处于正常状态的情况下,系统需要在 6:00 时准时推送消息给用户,为出发做好准备。在发送通知提醒用户的过程中,用户不希望得到过多的重复提醒。同时用户可以随时使用移动平台更新地理位置信息,即用户通知系统以新的地理位置作为出发点,系统将会自动更新配置信息,届时会监控该地点与机场之间的路况信息。


场景实现

在本章节中,我们将从整体角度向您简单介绍一下在场景实现过程中,所需要的各个组件之间是如何交互和如何进行通信的,以及为了实现本场景各个模块所应该实现的功能。针对于本场景的实现,我们所采用的系统架构如图 1 所示。

图 1. 场景系统架构图
图 1. 场景系统架构图

其中 Trigger 是我们自己定义的触发器。由于 Cast Iron 所提供的服务是无状态的服务,是无法识别服务调用者的身份的,Trigger 的作用是保持用户的基本会话信息,并且按照我们定义的规则监控事件,当现实事件满足业务规则时去启动 Cast Iron 流程,完成对路况的监控和消息的推送等功能。当然 Trigger 并不是场景的重点,读者可以选择其他一些事件处理软件,例如 IBM 的一款事件处理的产品 WebSphere Business Event(WBE),鉴于文章篇幅我们在此就不做过多介绍,感兴趣的读者可以查看其他关于 WBE 的文章。

MQTT Broker 是消息处理的服务器,负责记录消息的预订者信息,收集消息并根据消息的预订将消息推送到各个 MQTT 客户端。MQTT Broker 是基于 IBM MQTT 开放协议实现的服务端,当然也可以选择其他一些基于 IBM MQTT 开放协议的替代产品。我们推荐使用的服务端 Broker 有两款产品,一款是 IBM 公司提供的可以免费使用的软件 Really Small Message Broker(RSMB),当然这款软件并不是开源的产品;另一款是则是开源的项目 Mosquitto Message Broker。详情请查看 MQTT 官方网站

在右侧 Cast Iron 通过场景所需要的 Connector 将地理位置信息服务和航空信息服务都集成到一起,因为这些并不是本文关注的重点,而且所需要涉及到的服务比较多,所以在此用一朵云来进行表示,对于该部分感兴趣的读者建议阅读 WebSphere Cast Iron 应用集成解决方案简介

在 Smart Flight 场景中,我们在 Trigger 中定义的规则如下所示:

  • 正常提醒用户时间:6:00
  • 监控路况时间范围:5:00 - 6:00
  • 监控路况时间间隔:每 10 分钟启动一次 Cast Iron 监控流程

在本场景的实现中,关键模块之间的消息传输都是基于 MQTT 的发布 / 预订模式来进行实现的,目的是为了保障数据传输的可靠性,对此涉及到的主题我们定义如下:

表 1. MQTT 主题定义
部件发布预订
AndroidSmart Flight/Client/LocationSmart Flight/Client/Alarm
TriggerSmart Flight/Server/MonitorSmart Flight/Server/Trigger
Cast IronSmart Flight/Client/Alarm
Smart Flight/Server/Trigger
Smart Flight/Client/Location
Smart Flight/Server/Monitor

在程序运行过程中,用户的地理位置信息需要进行更新的时候,Android 移动平台将会基于主题 Smart Flight/Client/Location 发布一条消息,将自己的地理位置信息传送到 MQTT Broker,因为 Cast Iron 预订了该主题的消息,MQTT Broker 会将该消息推送到 Cast Iron。然后 Cast Iron 处理该消息的业务流程被触发,Cast Iron 一步步地去地理位置信息服务处取得所需要的地理信息计算出用户将会在正常情况下花费在路上的时间,自动生成 Trigger 更新用的配置信息,接着通过主题 Smart Flight/Server/Trigger 发布消息,MQTT Broker 收集到该消息后,将该消息推送到订阅了该主题的 Trigger,之后 Trigger 自动更新自己的配置信息。

Trigger 根据配置信息监控各种事件状态,当满足 Trigger 的触发条件时,Trigger 将会通过主题 Smart Flight/Server/Monitor 发布消息,MQTT Broker 收集消息后,将其推送到 Cast Iron,进而启动了路况监控流程,然后 Cast Iron 会通过集成了的各种地理服务取得实时路况信息,经过大量的计算分析出路况结果,发现路况状况不良后,通过主题 Smart Flight/Client/Alarm 发布提醒用户消息,因为 Android 移动平台已经对该主题进行了预订,所以 MQTT Broker 可以准确无误地将提醒消息推送到用户的 Android 移动平台,实现及时提醒用户的作用。


关键部分实现

通过上一小节的描述,相信读者已经可以非常清楚地理解整个场景的运作模式,各个部分的作用以及相互之间的通信形式,这个部分我们将向您详细介绍该场景中的关键部分的具体实现。需要说明的是,由于目前现有可以使用的产品部分条件的限制,为了能够使 Smart Flight 场景能够正常地运作,我们不得不采用了一些辅助手段,具体实现根据图 2 所示的设计。

图 2. 场景实现设计图
图 2. 场景实现设计图

这里需要重点说明的是现有的 Cast Iron 版本尚未添加对 MQTT 协议的支持,未来的版本可能会有相应的功能实现,所以为了使本场景可以正常的运作,我们添加了一个 HTTP Server 功能模块。该 Server 的功能是起到枢纽的作用,在 MQTT Broker 和 Cast Iron 之间建立通信的桥梁,协助 Cast Iron 实现 MQTT 消息发布和预定的工作。

HTTP Server 的实现可以采用一些现有的比较成熟的服务器,如 Apache、Tomcat 等,同样也可以采用多种语言实现相应的功能,如 PHP、Java 等,而且针对于不同的语言都提供有相应的 MQTT Client 工具包可以使用,在这些工具包的基础上您不需要做太多的工作就可以非常方便快速地搭建这样一个 HTTP Server,满足实验的需求。在示例代码中,我们将使用 Java 自身的类库实现了一个简单的 HTTP Server。

HTTP Server 使用简单的 HTTP 通信方式来与 Cast Iron 进行交互。当 Cast Iron 需要通过 HTTP Server 来发布消息时,通过 HTTP Post 的形式来访问 HTTP Server,HTTP Server 通过 Publish Handler 来调用底层的 MQTT Client 发布消息给 MQTT Broker;当预订的主题被推送到 HTTP Server 时,将会通过 HTTP Request 的方式调用 Cast Iron,进而触发 Cast Iron 的相应的业务流程进行处理。

下面我们将从 Cast Iron 业务流程配置、HTTP Server 服务实现和 MQTT 发布 / 预订的实现三个方面详细介绍 Smart Flight 场景的实现。

Cast Iron 业务流程配置

首先第一个要实现的场景是用户出发的地理位置发生变化时,需要修改触发器的配置信息,这时需要通过 Location 业务流程来完成相关工作,在该部分 Cast Iron 需要完成的业务流程配置如图 3 所示。

图 3. Location 业务流程配置
图 3. Location 业务流程配置

当用户通过移动平台提交变更后的地理位置信息时,Android 移动平台将会给予主题 Smart Flight/Client/Location 发布的消息经过 MQTT Broker 被推送到达 HTTP Server 之后,HTTP Server 发送 HTTP 请求到 Cast Iron 的服务。这时 Receive Request Activity 收到消息后被触发,启动图示中处理 Location 业务的流程,然后通过 Post Request Activity 去调用 Google 的地理位置服务,将取得的结果经过运算后会将正常情况下用户用户赶往飞机场所需要的时间等信息保存到数据库中,同时将生成的 Trigger 配置信息通过 Post Request Activity 发送到 HTTP Server,进而发布基于主题 Smart Flight/Server/Trigger 的更新 Trigger 配置信息的消息,然后 MQTT Broker 会将该消息准确无误地推送到 Trigger 上,通过以上流程实现了该部分的逻辑处理。

同时 Trigger 根据配置信息对事件进行监控,当 Trigger 发现满足一定条件时,需要启动监控流程开始监控路况,Cast Iron 需要完成的业务流程配置如图 4 所示。

图 4 Monitor 业务流程配置
图 4 Monitor 业务流程配置

该部分 Trigger 发布的基于 Smart Flight/Server/Monitor 的路况监控的消息经过 MQTT Broker 被推送到 HTTP Server 之后,HTTP Server 发送 HTTP 请求触发 Cast Iron Monitor 的业务流程。这时 Receive Request Activity 识别请求后,将会查询数据库取出与用户相关的数据信息,然后通过 Google 的地理位置信息服务获取到实时路况信息,通过运算对比在正常路况时用户需要花费在路上的时间,如果这时发现路上需要的时间比较长,则将信息记录下来之后发布一条基于主题 Smart Flight/Client/Alarm 的提醒消息,MQTT Broker 将消息推送到 Android 移动平台。而如果路况很正常时,则简单地将结果记录一下,并不会推送提醒消息给用户。

HTTP Server 服务实现

如果在 Cast Iron 中增加对 MQTT 协议的支持,会添加类似于 Receive Request 和 Post Request 类似的 Activity,这样 Cast Iron 便可以非常方便地去发布和预定 MQTT 消息。这里我们只能自己先简单实现一个 HTTP Server,监听本地的网络端口,当 Cast Iron 需要发布消息时将会发起 HTTP 请求,HTTP Server 收到请求后根据不同的请求内容,执行不同的处理。HTTP Server 发布 MQTT 消息实现的示例代码如下:

清单 1. HTTP Server 发布 MQTT 消息
 public void provideServer(){ 
	 HttpServerProvider provider = HttpServerProvider.provider(); 
 InetSocketAddress addr =  new InetSocketAddress(8081); 
	 try { 
 // 实例化 HTTP Server,监听本地 8081 端口
 HttpServer server = provider.createHttpServer(addr, 1); 

 //HTTP Server 发布 Smart Flight/Server/Trigger 消息
		 server.createContext("/Trigger", new HttpHandler(){ 
   public void handle(HttpExchange exchange) throws IOException { 
	 InputStream input = exchange.getRequestBody(); 
					
	 byte[] buffer = new byte[1024]; 
	 int bufferRead = input.read(buffer); 					
	 publish(new String(buffer , 0 , bufferRead), "Smart Flight/Server/Trigger", 2); 
				
	 String response = "Trigger Published"; 
	 exchange.sendResponseHeaders(200, response.length()); 
	 OutputStream output = exchange.getResponseBody(); 
	 output.write(response.getBytes()); 
	 output.flush(); 
	 output.close(); 
   } 
		 }); 
 //HTTP Server 发布 Smart Flight/Client/Alarm 消息
	 server.createContext("/Alarm", new HttpHandler(){ 
   public void handle(HttpExchange exchange) throws IOException { 
	InputStream input = exchange.getRequestBody(); 
					
	 byte[] buffer = new byte[1024]; 
	 int bufferRead = input.read(buffer); 					
	 publish(new String(buffer , 0 , bufferRead), "Smart Flight/Client/Alarm", 2); 
					
	 String response = "Alarm Published"; 
	 exchange.sendResponseHeaders(200, response.length()); 
	 OutputStream output = exchange.getResponseBody(); 
	 output.write(response.getBytes()); 
	 output.flush(); 
	output.close(); 
   } 
		 }); 
		 server.setExecutor(null); 
		 server.start(); 
	 } catch (IOException e) { 
		 e.printStackTrace(); 
	 } 
 }

以上代码便实现了一个简单 HTTP Server,用于协助 Cast Iron 完成 MQTT 消息的发布。HTTP Server 将会监听本地 8081 端口,当收到 Cast Iron 请求 http://localhost:8081/Trigger 调用的时候,HTTP Server 将会发布以 Smart Flight/Server/Trigger 为主题的消息;当 Cast Iron 请求调用 http://localhost:8081/Alarm 时,HTTPServer 将会发布以 Smart Flight/Client/Alarm 为主题的消息。在上述代码中,publish 方法是调用 MQTT Client 执行发布消息的方法,该方法的实现将会在下一部分进行详细说明。

MQTT 发布 / 预订实现

在本场景的实现中,Android 移动平台、Trigger、HTTP Server 都需要实现 MQTT Client,即 MQTT 的发布和预定功能。在本小节中,我们将会详细向您介绍 MQTT 发布 / 预订功能的实现,以及三个不同的组件中各需要做哪些相应的处理。本部分将以 HTTP Server 部分的 MQTT Client 的实现为例进行说明。

  1. 导入 MQTT 工具包

将 com.ibm.micro.client.mqttv3.jar 工具包导入到 Java 运行环境中,并在代码中添加如下代码

清单 2. 导入 MQTT Java 包
 import com.ibm.micro.client.mqttv3.MqttCallback; 
 import com.ibm.micro.client.mqttv3.MqttClient; 
 import com.ibm.micro.client.mqttv3.MqttConnectOptions; 
 import com.ibm.micro.client.mqttv3.MqttDeliveryToken; 
 import com.ibm.micro.client.mqttv3.MqttException; 
 import com.ibm.micro.client.mqttv3.MqttMessage; 
 import com.ibm.micro.client.mqttv3.MqttSecurityException; 
 import com.ibm.micro.client.mqttv3.MqttTopic;
  1. 创建 Client,示例代码如下
清单 3. 创建 MQTT Client 实例
 public void initClient(){ 
 try { 
 if(this.client  == null){ 
	 this.client = new MqttClient(serverURI, clientId); 
 } 
 } catch (MqttException e) { 
		 e.printStackTrace(); 
 } 
 }

MQTT Client 和 Broker 在建立连接的过程中,不允许出现相同的客户端 ID,一旦出现相同的客户端 ID,则之前建立的链接将会被中断,所以需要保证只创建一次连接。

  1. 添加发布消息代码,即 publish 方法实现,示例如下
清单 4 使用 MQTT 发布消息
 public boolean publish(String topic , String message , int qos){ 
 initClient(); 
 try { 
	 MqttTopic mqTopic = client.getTopic(topic); 
	 MqttMessage mqMsg = new MqttMessage(message.getBytes()); 
	 mqMsg.setQos(qos); 
	 if(!client.isConnected()) 
		 client.connect(); 
	 MqttDeliveryToken token = mqTopic.publish(mqMsg); 
	 while(!token.isComplete()) 
		 token.waitForCompletion(1000); 
 client.disconnect(); 
 } catch (MqttSecurityException e) { 
	 e.printStackTrace(); 
 } catch (MqttException e) { 
	 e.printStackTrace(); 
 } 
	 return false; 
 }
  1. 添加预订消息部分代码,示例如下
清单 5 使用 MQTT 预订消息
 public void subscribe(String topicStr , int qos){ 
 initClient(); 
 try { 
 if(!client.isConnected()){ 
 MqttConnectOptions options = new MqttConnectOptions(); 
		 options.setCleanSession(false); 
 MqttCallback callback = new CallBack(this.client , options , 
 topicStr , qos); 
		 client.setCallback(callback); 			
		 client.connect(options); 
	 } 
	 client.subscribe(topicStr, qos); 
 } catch (MqttException e) { 
	 e.printStackTrace(); 
 } catch (Exception e) { 
	 e.printStackTrace(); 
 } 
 }
  1. 创建 CallBack 类,示例代码如下
清单 6 实现 MQTT CallBack 类
 public class CallBack implements MqttCallback { 
 private MqttClient client; 
 private MqttConnectOptions options; 
 public CallBack(MqttClient client , MqttConnectOptions options ,
  String topic , int qos){ 
	 this.client = client; 
	 this.options = options; 
 } 
	
 @Override 
 public void connectionLost(Throwable arg0) { 
	 try { 
		 if(!client.isConnected()){ 
			 Thread.sleep(5000); 
			 client.connect(options); 
			 System.out.println("Connected"); 
		 } 
	 } catch (MqttException e) { 
		 e.printStackTrace(); 
	 } catch (Exception e) { 
		 e.printStackTrace(); 
	 } 
 } 

 @Override 
 public void deliveryComplete(MqttDeliveryToken arg0) { 
		 System.out.println("deliveryComplete"); 
 } 

 @Override 
 public void messageArrived(MqttTopic topic, MqttMessage msg) 
		 throws Exception { 
    if(topic.toString().equals("Smart Flight/Client/Location")){ 
		 callCastIron("Location" , msg); 
 } 
 else if(topic.toString().equals("Smart Flight/Server/Monitor")){ 
		 callCastIron("Monitor" , msg); 
	 } 		
 } 
	
 private void callCastIron(String path , MqttMessage msg) throws Exception{ 
	 URL url = new URL("http://Server IP/" + path); 
 HttpURLConnection conn = (HttpURLConnection) url.openConnection(); 
	 conn.setDoOutput(true); 
	 conn.setRequestMethod("POST"); 
	 OutputStream output = conn.getOutputStream(); 
	 output.write(msg.toString().getBytes()); 
	 output.flush(); 
	 output.close(); 
	 int code = conn.getResponseCode(); 
	 String resMsg = conn.getResponseMessage(); 
 System.out.println("Res Code:" + code + " Res Message:" + resMsg); 
 } 
 }

CallBack 类是 MQTT Client 处理事件的类,需要完成的最重要的工作是实现 messageArrived 方法和 connectionLost 方法。其中 messageArrived 方法,是当消息被推送到该 Client 的时候回调的方法,而 connectionLost 方法是当连接中断的时候回调的方法,合理利用这两个方法便能够处理好主题的预订。上述代码中,HTTP Server 中正是在 messageArrived 方法中通过 HTTP 请求去触发 Cast Iron 的流程,在 connectionLost 方法中每隔一段时间重新连接 MQTT Broker。

在 Smart Flight 场景中,针对于 Android 移动平台的 MQTT Client 需要在 messageArrived 方法中,通过 topic 获取到推送的消息后执行界面的一些处理,显示在 Android 移动平台上,例如调用闹钟模块启动闹钟提醒用户;同样在 Trigger 中需要在 messageArrived 方法中添加处理更新 Trigger 配置信息的代码。

完成以上五个步骤的工作之后,很容易便针对不同的模块搭建各自的 MQTT Client。这样本场景实现的几个关键部分也都已经完成。通过 Trigger、Android 移动平台设备和 Cast Iron 的业务配置,便可以在 Smart Flight 场景中轻松实现数据的可靠传递和推送。


结束语

Cast Iron 以云计算平台为基础,简单方便的业务配置模式,在业务集成方面有着独特的优势。MQTT 简单的传输协议,稳定可靠的传输保障,使其在物联网领域发挥着独特的作用。将 Cast Iron 和 MQTT 完美融合,将会使我们的业务整合和集成工作变得更加简单,未来将在业务向移动平台扩展方面展现出更高地价值,极大降低工作人员的开发成本。


致谢

在本文完成之际,特别感谢 Cast Iron 和 MQTT 中国开发团队的支持和帮助,感谢向荣、杜冰冰、魏国兴、吴卫、李铮的指导和建议,本文的完成得益于每位同事的不懈努力。

参考资料

学习

获得产品和技术

讨论

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=WebSphere, Cloud computing, 移动开发
ArticleID=822228
ArticleTitle=IBM WebSphere Cast Iron 与 WebSphere MQ Telemetry Transport 协作实现业务消息推送
publish-date=06212012