利用 Amazon Web Services 集成企业应用程序

使用 Amazon SQS 发送 XML 消息

探索如何利用 XML 和 Amazon Web Services 集成企业应用程序,以及使用 Microsoft® .NET (C#) 和 Java™ 平台构建跨平台应用程序集成功能。

Brian J Stewart, 首席顾问, Aqua Data Technologies, Inc.

Brian J. Stewart 目前是 Aqua Data Technologies 的一位首席顾问。他是这家公司的创始人,该公司关注内容管理、XML 技术和企业客户端/服务器与 Web 系统。他架构并开发了基于 J2EE 和 .NET 平台的企业解决方案。



2009 年 7 月 27 日

队列 是用于存储等待处理的消息的临时数据结构。Amazon Simple Queue Services (Amazon SQS) 是一个支持 Web 服务的高可用性可伸缩消息队列。Amazon SQS 的主要益处包括:

常用缩略词

  • API:应用编程接口
  • DOM:文档对象模型
  • HTTP:超文本传输协议
  • XML:可扩展标记语言
  • 基于云的解决方案。由 Amazon 管理,不需使用私有基础设施,也不需要专业支持知识。
  • 基于 Internet。任何连接到 Internet 的客户端都可以通过 Web 服务访问该服务,因此支持业务到业务(B2B)集成。
  • 冗余。该服务在多个服务器上存储所有消息,以提供高可用性和容错。
  • 多个并发读/写。Amazon SQS 支持多个进程同时读写一个队列,以及在处理窗口时锁定消息,以避免两个客户端同时处理一条消息。
  • 可配置。通过使用 Amazon SQS 服务,您可以根据存储在队列中的消息的处理需求设计和锁定窗口。锁定窗口能够阻止两个队列读取器同时处理同一个队列项。对于处理时间更长的队列项,则需要更长时间地锁定窗口。锁定窗口由可见性超时参数控制,您可以给每个队列配置该参数。
  • 易于使用的 API。它为常见的语言(包括 Java 和 Microsoft .NET 平台)提供 API 包装器,以支持快速开发并无缝地集成到现有应用程序中。
  • 低成本的解决方案。公司仅需为它们的 HTTP 请求使用的带宽付费;Amazon SQS 不收取其他额外的费用。

在开始 Amazon SQS 开发之前,了解它的一些特征是非常有帮助的。如果不了解这些特征,您刚开始使用 Amazon SQS 时可能会碰到挫折,或感到困惑。

首先,Amazon 不能保证队列项的处理顺序。这意味着先进先出(first-in-first-out,FIFO)处理不得到保证,这在很多消息队列实现中都很常见。Amazon 仅保证所有消息都分发出去。

云计算空间

您是否希望随时获取最新的云计算消息?是否想得到云计算相关的技术知识?developerWorks 云计算空间就是这样一个云计算信息资源的门户,在这里您可以了解来自 IBM 和业界其他媒体的最新信息,并且得到如何在云环境中使用 IBM 软件的入门知识。

IBM 在 Amazon EC2 云计算环境中提供了 DB2、Informix、Lotus、WebSphere 等方面的 AMI 镜像资源。您只需按使用量支付少量费用,就可以使用到云上的数据、门户、Web 内容管理、情景应用等服务。欢迎您随时访问 云计算空间,获取更多信息。

Amazon SQS 的第二大特征是最终一致性。大型数据库系统的主要特征是一致性、高可用性和可伸缩性。Amazon 不是关注所有 3 个特征,而是主要关注高可用性和可伸缩性,然后再以此为基础提供最终的一致性。这意味着 Amazon 通过将所有消息发送到多个服务器来实现高可用性和可伸缩性。Amazon 保证最终会将所有消息分发出去,但不保证什么时候分发它们。从实用的角度看,这意味着假如您向一个队列发送 3 条消息,当下次尝试接收这些消息时,不一定能收到所有 3 条消息。您可能在一个 Read 中收到所有 3 条消息,或在第一个 Read 中收到前两条消息,在第二个 Read 中收到第三条消息。如果您持续地轮询该队列,最终肯定能收到所有 3 条消息。

为了开始使用 Amazon SQS,您必须根据自己使用的语言获取 Amazon SQS 的 API 库。Amazon 为所有常见的语言都提供了一个库,比如 Perl、Microsoft Visual Basic®.NET、C#、Java 和 PHP。这些库是开源的,并且易于使用。参考资料 小节提供这些库的下载链接。

通过 Java 语言使用 Amazon SQS

现在您首先学习如何使用 Java 语言创建队列、发送消息和接收消息。第一步是创建一个 Amazon SQS 队列。清单 1 中的代码显示了如何为 Amazon SQS 创建 HTTP 客户端、实例化 CreateQueueRequest 对象和调用队列创建请求。Access Key ID(由 20 个字母和数字组成)是请求身份验证或读取队列项所需的密匙。为了创建或操作队列项,您需要使用 Secret Access Key(由 40 个字母和数字组成)。注册 Amazon 时就会收到这些密匙。(参考资料 小节提供一个文章链接,这篇关于云安全的文章进一步讨论了这些密匙)。

清单 1. 创建 Amazon SQS 队列
String queueName = "TestQueue";

// create http client
AmazonSQS service = new AmazonSQSClient(accessKeyId, secretAccessKey);

// instantiate create queue request
CreateQueueRequest request = new CreateQueueRequest();
request.setQueueName(queueName);
request.setDefaultVisibilityTimeout(30);

// execute create queue operation and get the server response
System.out.print("Creating Queue: " + queueName);    
CreateQueueResponse response = service.createQueue(request);
if (response.isSetCreateQueueResult()) {
	System.out.print("Create Queue Result:");    
	CreateQueueResult createQueueResult = response.getCreateQueueResult();
	if (createQueueResult.isSetQueueUrl()) {
		System.out.print("Queue Url: " + createQueueResult.getQueueUrl());
	}
}

下一步是向最新创建的队列发送一条消息。清单 2 中的代码显示了如何为 Amazon SQS 创建 HTTP 客户端,以及如何向队列发送一个简单的消息。

清单 2. 向队列发送消息
String queueName = "TestQueue";

// create http client
AmazonSQS service = new AmazonSQSClient(accessKeyId, secretAccessKey);

// instantiate send message request
SendMessageRequest request = new SendMessageRequest();
request.setQueueName(queueName);
request.setMessageBody("Test SQS Message");

// execute the send message operation and get the server response
SendMessageResponse response = service.sendMessage(request);
if (response.isSetSendMessageResult()) {
	System.out.print("Send Message Result: ");

	SendMessageResult sendMessageResult = response.getSendMessageResult();
	if (sendMessageResult.isSetMessageId()) {
		System.out.print("\tMessageId: " + sendMessageResult.getMessageId());
	}
}

现在,我们尝试从队列接收消息。清单 3 显示了如何为 Amazon SQS 创建 HTTP 客户端,以及如何从队列接收消息。Message 包含来自队列的消息并公开几个关键方法:

  • getMessageId返回消息的唯一标识符。您可以使用 isSetMessageId 确定消息 ID 是否已设置。
  • getReceiptHandle将句柄返回给消息。句柄用于删除消息。您可以使用 isSetReceiptHandle 确定消息句柄是否已设置。
  • getBody以字符串的形式返回消息体。消息可以是纯文本或 XML,您可以使用 isSetBody 确定消息体是否已设置。
清单 3. 从队列接收消息
String queueName = "TestQueue";

// create http client
AmazonSQS service = new AmazonSQSClient(accessKeyId, secretAccessKey);

// instantiate the receive message request
ReceiveMessageRequest request = new ReceiveMessageRequest();
request.setQueueName(queueName);
// the following two parameters are optional
request.setMaxNumberOfMessages(10); // set maximum number of messages to receive
request.setVisibilityTimeout(30); // set visibility window
		 
// execute the receive messages operation and get server response
ReceiveMessageResponse response = service.receiveMessage(request);

System.out.print("Receive Message Response:");

if (response.isSetReceiveMessageResult()) {
	ReceiveMessageResult  receiveMessageResult = response.getReceiveMessageResult();
	java.util.List<Message> messageList = receiveMessageResult.getMessage();
	for (Message message : messageList) {			
		if (message.isSetMessageId()) {
			System.out.print("MessageId: " + message.getMessageId());
		}

		if (message.isSetReceiptHandle()) {
			System.out.print("ReceiptHandle: " + message.getReceiptHandle());
		}
		if (message.isSetBody()) {
			System.out.print("Body: " + message.getBody());
		}
}

通过 C# 使用 Amazon SQS

现在,您将使用 C# 将一个对象系列化到 XML,并将其作为 Amazon SQS 消息发送。

第一步是创建一个将被系列化的业务对象;清单 4 显示了一个 Product 对象。公共属性被控制 XML 系列化的属性修饰。C# 属性类似于 Java 注释,定义属性如何映射到 XML 元素或 XML 属性。此外,这个类包含将对象实例系列化到 XML 的 ToXml() 方法。

清单 4. 创建用于系列化的业务对象
namespace Stewart.Test
{
/// <summary>
/// Product
/// </summary>
[XmlRoot(ElementName="Product")]
public class Product
{
	/// <summary>
	/// Product Name
	/// </summary>
	[XmlElement("ProductName")]
	public string ProductName;

	/// <summary>
	/// Product Price
	/// </summary>
	[XmlElement("ProductPrice")]
	public decimal ProductPrice;

	/// <summary>
	/// Quantity in stock
	/// </summary>
	[XmlElement("InStock")]
	public bool InStock;

	/// <summary>
	/// Product Id
	/// </summary>
	[XmlAttributeAttribute(AttributeName = "Id", DataType = "integer")]
	public string Id;

	/// <summary>
	/// Initializes a new instance of the <see cref="Product"/> class.
	/// </summary>
	public Product()
	{
	}

	/// <summary>
	/// Initializes a new instance of the <see cref="Product"/> class.
	/// </summary>
	/// <param name="productName">Name of the product.</param>
	/// <param name="productPrice">The product price.</param>
	public Product(string productName, decimal productPrice)
	{
		this.ProductName = productName;
		this.ProductPrice = productPrice;
	}

	/// <summary>
	/// Converts to XML.
	/// </summary>
	/// <returns></returns>
	public String ToXml()
	{
		StringBuilder output = new StringBuilder();

		// no name space
		XmlSerializerNamespaces ns = new XmlSerializerNamespaces();
		ns.Add("", "");

		// settings to omit xml declaration
		XmlWriterSettings settings = new XmlWriterSettings();
		settings.OmitXmlDeclaration = true;

		// finally serialize to string
		XmlWriter writer = XmlTextWriter.Create(output, settings);
		XmlSerializer serializer = new XmlSerializer(typeof(Product));            
		serializer.Serialize(writer, this, ns);

		// return string containing XML document
		return output.ToString();
	}
}

接下来,发送 XML 消息。用于 Amazon SQS 的 Amazon C# API 在功能上类似于 Java API。清单 5 中的代码显示了如何使用 C# 发送消息。

清单 5. 使用 C# 发送消息
Product prod = new Product("Widget", 1.5M);
string accessKeyId = ConfigurationSettings.AppSettings["AmazonAccessKeyID"];
string secretAccessKey = ConfigurationSettings.AppSettings["AmazonSecretAccessKey"];

AmazonSQS service = new AmazonSQSClient(accessKeyId, secretAccessKey);
SendMessageRequest request = new SendMessageRequest();
request.MessageBody = prod.ToXml();
request.QueueName = "TestQueue";

SendMessageResponse response = service.SendMessage(request);
if (response.IsSetSendMessageResult())
{
	Console.WriteLine("Send Message Response: ");
	SendMessageResult sendMessageResult = response.SendMessageResult;
	if (sendMessageResult.IsSetMessageId())
	{
		Console.WriteLine(String.Format("MessageId {0}", 
			sendMessageResult.MessageId));
	}
	if (sendMessageResult.IsSetMD5OfMessageBody())
	{
		Console.WriteLine(String.Format("MD5OfMessageBody: {0}", 
			sendMessageResult.MD5OfMessageBody));
	}
}

图 1 显示了 清单 5 的输出结果。

图 1. 发送 XML 消息的输出
发送 XML 消息的输出的屏幕截图

最后一步是从队列接收 XML 消息,并反系列化实例。清单 6 显示了将 XML 消息反系列化到 Product 实例的代码。

清单 6. 反序列化 XML 消息
Product prod = null;            

string accessKeyId = ConfigurationSettings.AppSettings["AmazonAccessKeyID"];
string secretAccessKey = ConfigurationSettings.AppSettings["AmazonSecretAccessKey"];

AmazonSQS service = new AmazonSQSClient(accessKeyId, secretAccessKey);

ReceiveMessageRequest request = new ReceiveMessageRequest();
request.QueueName = "TestQueue";
ReceiveMessageResponse response = service.ReceiveMessage(request);

if (response.IsSetReceiveMessageResult())
{
	Console.WriteLine("Receive Message Result:");
	ReceiveMessageResult receiveMessageResult = response.ReceiveMessageResult;
	List<Message> messageList = receiveMessageResult.Message;
	foreach (Message message in messageList)
	{                    
		if (message.IsSetMessageId())
		{
			Console.WriteLine(String.Format("MessageId: {0}",
				message.MessageId));
		}                                      
		if (message.IsSetBody())
		{
			Console.WriteLine(string.Format("Body: {0}", message.Body));
			String xml = message.Body;
			StringReader sr = new StringReader(xml);
			XmlSerializer serializer = new XmlSerializer(typeof(Product));
			prod = (Product) serializer.Deserialize(sr);
			Console.WriteLine(string.Format("Id: {0}", prod.Id));
			Console.WriteLine(string.Format("Name: {0}", prod.ProductName));
			Console.WriteLine(string.Format("Price: {0}", prod.ProductPrice));
		}
	}
}

图 2 显示了 清单 6 的输出结果。

图 2. 接收 XML 消息输出
接收 XML 消息输出的屏幕截图

尽管以上的例子非常简单,但是它们是非常强大的,因为您可以系列化一个对象,并向另一个不局限于本地物理网络的应用程序发送消息。这里没有复杂的防火墙限制或安全性考虑事项。此外,不需要用相同的语言编写消息的发送器和接收器,甚至不需要使用相同的平台。

技术概述和设计

这个示例解决方案包含需要集成业务流程的分销商和制造商。分销商制造商 处购买商品并出售给客户

当客户需要商品时,分销商使用 C# WinForm 客户端提交一个客户订单。订单提交程序将订单细节存储在一个本地 MySQL 数据库中。该客户端还允许用户浏览库存、查看订单和 Amazon SQS 队列。

一般而言,分销商的库存能够满足客户的订购需求。如果库存不足的话,分销商会及时向制造商发送一个购买订单。然后,当物品已经发出时,制造商发送回一个订单汇总。所有这些通信都使用 Amazon SQS 来完成。

分销商的 Order Fulfillment 和 Inventory Management 服务也是使用 C# 构建的,它轮询进入的商品和待处理的客户订单。当处理客户订单时发现商品库存少于订购数量,那么将使用 Amazon SQS 向制造商发送一个购买订单。队列项的消息体是一个包含购买订单的 XML 文档。

制造商的 Order Processing Service 是基于 Java 平台构建的,它处理购买订单队列。当物品已经发出时,它将使用 Amazon SQS 向分销商回复一条消息。该消息的消息体是包含订单汇总的 XML 文档。

图 3 显示了涉及到的系统。

图 3. 解决方案概图
显示了客户、分销商、Amazon SQS 和制造商的解决方案概图

创建 XML 模式

第一步是为在分销商和制造商之间发送的消息定义 XML 模式。您需要两个模式:一个购买订单和一个订单汇总。

清单 7 显示了购买订单的模式。

清单 7. 购买订单模式
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
  <xs:element name="PurchaseOrder">
    <xs:complexType>
      <xs:sequence>
        <xs:element name="Id" type="xs:string" minOccurs="0" />
        <xs:element name="OrderDate" type="xs:string" minOccurs="0" />
        <xs:element name="Company" minOccurs="0" maxOccurs="unbounded">
          <xs:complexType>
            <xs:sequence>
              <xs:element name="CompanyName" type="xs:string" minOccurs="0" />
              <xs:element name="StreetAddress" type="xs:string" minOccurs="0" />
              <xs:element name="City" type="xs:string" minOccurs="0" />
              <xs:element name="State" type="xs:string" minOccurs="0" />
              <xs:element name="ZipCode" type="xs:string" minOccurs="0" />
            </xs:sequence>
          </xs:complexType>
        </xs:element>
        <xs:element name="Vendor" minOccurs="0" maxOccurs="unbounded">
          <xs:complexType>
            <xs:sequence>
              <xs:element name="CompanyName" type="xs:string" minOccurs="0" />
              <xs:element name="StreetAddress" type="xs:string" minOccurs="0" />
              <xs:element name="City" type="xs:string" minOccurs="0" />
              <xs:element name="State" type="xs:string" minOccurs="0" />
              <xs:element name="ZipCode" type="xs:string" minOccurs="0" />
            </xs:sequence>
          </xs:complexType>
        </xs:element>
        <xs:element name="Items" minOccurs="0" maxOccurs="unbounded">
          <xs:complexType>
            <xs:sequence>
              <xs:element name="Item" minOccurs="0" maxOccurs="unbounded">
                <xs:complexType>
                  <xs:attribute name="Id" type="xs:string" />
                  <xs:attribute name="Name" type="xs:string" />
                  <xs:attribute name="Quantity" type="xs:string" />
                </xs:complexType>
              </xs:element>
            </xs:sequence>
          </xs:complexType>
        </xs:element>
      </xs:sequence>
    </xs:complexType>
  </xs:element>  
</xs:schema>

Purchase Order XML 模式包含下列关键元素:

表 1. 购买订单模式中的关键元素
关键元素描述
Id包含 Purchase Order 的唯一标识符的字符串
OrderDate包含 Purchase Order 日期的字符串
Company包含分销商的关键地址信息,包括公司名称、街道地址、城市、州和邮政编码
Vendor包含制造商的关键地址信息,包括公司名称、街道地址、城市、州和邮政编码
Items包含订购商品的所有信息,包括商品 ID、商品名称和数量

清单 8 显示了订单汇总的模式。

清单 8. 订单汇总模式
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
  <xs:element name="OrderSummary">
    <xs:complexType>
      <xs:sequence>
        <xs:element name="OrderId" type="xs:string" minOccurs="0" />
        <xs:element name="ReferenceId" type="xs:string" minOccurs="0" />
        <xs:element name="OrderDate" type="xs:string" minOccurs="0" />
        <xs:element name="CompanyAddress" minOccurs="0" maxOccurs="unbounded">
          <xs:complexType>
            <xs:sequence>
              <xs:element name="CompanyName" type="xs:string" minOccurs="0" />
              <xs:element name="StreetAddress" type="xs:string" minOccurs="0" />
              <xs:element name="City" type="xs:string" minOccurs="0" />
              <xs:element name="State" type="xs:string" minOccurs="0" />
              <xs:element name="ZipCode" type="xs:string" minOccurs="0" />
            </xs:sequence>
          </xs:complexType>
        </xs:element>
        <xs:element name="CustomerAddress" minOccurs="0" maxOccurs="unbounded">
          <xs:complexType>
            <xs:sequence>
              <xs:element name="CompanyName" type="xs:string" minOccurs="0" />
              <xs:element name="StreetAddress" type="xs:string" minOccurs="0" />
              <xs:element name="City" type="xs:string" minOccurs="0" />
              <xs:element name="State" type="xs:string" minOccurs="0" />
              <xs:element name="ZipCode" type="xs:string" minOccurs="0" />
            </xs:sequence>
          </xs:complexType>
        </xs:element>
        <xs:element name="Items" minOccurs="0" maxOccurs="unbounded">
          <xs:complexType>
            <xs:sequence>
              <xs:element name="Item" minOccurs="0" maxOccurs="unbounded">
                <xs:complexType>
                  <xs:attribute name="ItemId" type="xs:string" />
                  <xs:attribute name="ItemName" type="xs:string" />
                  <xs:attribute name="Quantity" type="xs:string" />
                </xs:complexType>
              </xs:element>
            </xs:sequence>
          </xs:complexType>
        </xs:element>
      </xs:sequence>
    </xs:complexType>
  </xs:element>  
</xs:schema>

Order Summary XML 模式包含下列关键元素:

表 2. 订单汇总模式中的关键元素
Id包含 Order Summary 的唯一标识符的字符串
ReferenceId包含初始 Purchase Order 的 ID 的字符串
OrderDate包含 Order Summary 日期的字符串
CustomerAddress包含分销商的关键地址信息,包括公司名称、街道地址、城市、州和邮政编码
VendorAddress包含制造商的关键地址信息,包括公司名称、街道地址、城市、州和邮政编码
Items包含订购商品的所有信息,包括商品 ID、商品名称和数量

定义数据库实体模型

接下来,我们定义数据库模式。图 4 显示了数据库实体模型。

图 4. 数据库实体模型
列出 customer、customerorder、customerorderdetail、inventory 和 vendororder 实体的数据库实体模型

Reseller 数据实体包括以下内容:

  • Customer 包含订单的客户联系信息。
  • CustomerOrder 包含客户订单的订单信息。
  • CustomerOrderDetail 包含客户订单的商品细节。
  • Inventory 包含分销商的库存。

Manufacturer 数据实体为:

  • VendorOrder 跟踪由制造商的 Order Processing Service 处理的购买订单。

定义消息队列

最后需要定义的组建是消息队列。表 3 显示了这个解决方案的消息队列。

表 3. Amazon SMS 消息队列
队列名可见性超时描述
POQueue30 秒从分销商发送给制造商的购买订单消息
OSQueue30 秒从制造商发送给分销商的订单汇总消息

分销商实现

分销商应用程序包含 表 4 中的业务实体。

表 4. 分销商应用程序中的业务实体
描述
CompanyAddressEntity包含一个客户或业务地址
CustomerEntity包含一个客户
OrderEntity包含一个客户订单
OrderDetailEntity包含客户订单的商品细节
InventoryItemEntity包含库存商品
PurchaseOrderEntity包含购买订单
PurchaseOrderDetailEntity包含购买订单的商品细节

对于 Microsoft .NET,BaseSQSDataProvider 类还为 Amazon SQS 消息提供一个实用程序类。BaseDbProvider 为使用 MySQL 数据库提供一个实用程序类。


分销商订单管理系统

分销商订单管理客户端不是本文的重点,但该客户端是相当简单的。它是用 C# 编写的,并且是一个 n 层应用程序,其中所有业务和数据都与表示层分离。所有表单绑定都使用 Microsoft .NET 强大的对象绑定功能完成。所有下拉列表、网格视图和表单字段都绑定到业务对象。Windows® 表单使用的代码量很少,因为业务逻辑都保存在业务逻辑层。

图 5 显示了这个客户端。

图 5. 分销商订单管理客户端
分销商订单管理客户端的屏幕截图

分销商订单完成服务

OrderFulfillmentService 类负责处理客户订单,并且根据在 App.config 文件中指定的轮询间隔进行处理。这个服务获取尚未处理的订单列表。对于列表中的每个订单,以下操作会发生在相应的方法中:

  • 检查是否可以配送订单商品 —— 即库存是否充足(ProcessPendingOrders 方法)。
  • 如果能够配送订单商品就处理该订单(CanShip 方法)。
  • 如果不能配送订单商品,就推迟订单,并向制造商发送购买订单(ProcessBackorder 方法)。

清单 9 显示了 ProcessPendingOrders() 方法。

清单 9. ProcessPendingOrders() 方法
	public int ProcessPendingOrders()
	{
		int itemsProcessed = 0;

		// get orders not yet shipped
		CustomerOrderFactory factory = new CustomerOrderFactory();
		IList<OrderEntity> orders = factory.GetOrdersNotYetShipped();

		// iterate through all orders not processed
		IEnumerator<OrderEntity> ordersEnum = orders.GetEnumerator();
		while (ordersEnum.MoveNext()) {
			// get next order
			OrderEntity curOrder = ordersEnum.Current;
			Console.WriteLine(string.Format("Processing Order '{0}'...", 
				curOrder.Id));

			// check if merchandise is available to ship
			if (this.CanShip(curOrder)) {
				// process order
				if (this.ProcessOrder(curOrder)) {
					itemsProcessed++;
				}
			}// if can ship order
			else if (!curOrder.IsBackordered){
				// set order to backordered
				if (this.ProcessBackorder(curOrder)) {
					itemsProcessed++;
				}
			} // if can't ship order (not enough merchandise)
		} // while more orders to process

		return itemsProcessed;
	}

为了确定订单能否被处理,将根据订购的每种商品核查库存是否充足。清单 10 显示了 CanShip() 方法。

清单 10. CanShip() 方法
private bool CanShip(OrderEntity order)
{
bool hasMerchandise = true;            

// get items
IEnumerator<OrderDetailEntity> detailEnum = order.GetOrderItems();

// iterate through all items
while (detailEnum.MoveNext())
{
	// get current item
	OrderDetailEntity detailEntry = detailEnum.Current;

	InventoryItemEntity inventoryItem = detailEntry.Item;
	if (detailEntry.Quantity > inventoryItem.Quantity)
	{
		Console.WriteLine(
			string.Format("Order {0} - Insufficient Inventory: {1} ({2})",
			order.Id, inventoryItem.Name, inventoryItem.Id));
		hasMerchandise = false;
	} // if inventory is sufficient
} // while more entries to process

Console.WriteLine(string.Format("Order {0} - Can Ship: {1}", 
	order.Id, hasMerchandise));
return hasMerchandise;
}

如果 CanShip() 返回 False 并且该订单还没有推迟,那么将调用 ProcessBackorder() 方法,并且订单的 IsBackordered 属性被设置为 True。您将使用 MessageQueueFactory 创建队列项并发送购买订单消息。清单 11 显示了这个过程。

清单 11. ProcessBackorder() 方法
private bool ProcessBackorder(OrderEntity order)
{
	// set to backordered
	order.IsBackordered = true;       

	// update order
	CustomerOrderFactory factory = new CustomerOrderFactory();
	bool result = factory.UpdateOrder(order);

	if (!result) return result;

	// get purchase order xml
	string poXml = this.GetPurchaseOrderAsXml(order);            

	// create message queue
	MessageQueueFactory queueFactory = new MessageQueueFactory();
	return queueFactory.CreatePOQueueItem(poXml);
}

通过 GetPurchaseOrder() 方法创建 PurchaseOrderEntity 对象。对于库存不足的每种商品,将向其订单添加一个 OrderDetailEntity清单 12 显示了这个过程。

清单 12. GetPurchaseOrder() 方法
private PurchaseOrderEntity GetPurchaseOrder(OrderEntity order)
{
PurchaseOrderEntity po = new PurchaseOrderEntity();

po.PurchaseDate = DateTime.Now;

// set company address of the Purchase Order - the Reseller
po.CompanyAddress.CompanyName = "The Widget Factory";
po.CompanyAddress.StreetAddress = "100 Main Street";
po.CompanyAddress.CityName = "Las Vegas";
po.CompanyAddress.StateName = "NV";
po.CompanyAddress.ZipCode = "89044";

// set vendor address of the Purchase Order - the Manufacturer
po.VendorAddress.CompanyName = "Widget Supplies";
po.VendorAddress.StreetAddress = "100 Main Street";
po.VendorAddress.CityName = "Orlando";
po.VendorAddress.StateName = "FL";
po.VendorAddress.ZipCode = "32801";

// while more items to process
IEnumerator<OrderDetailEntity> orderEnum = order.GetOrderItems();
while (orderEnum.MoveNext())
{
	OrderDetailEntity orderItem = orderEnum.Current;
	InventoryItemEntity inventoryItem = orderItem.Item;

	// if insufficient inventory
	if (orderItem.Quantity > inventoryItem.Quantity)
	{
		// order the number needed plus 100
		int quantityToOrder = (orderItem.Quantity - inventoryItem.Quantity) + 100;
		PurchaseOrderDetailEntity poItem = new PurchaseOrderDetailEntity();
		poItem.ItemName = inventoryItem.Name;
		poItem.ItemId = inventoryItem.Id;
		poItem.Quantity = quantityToOrder;

		// add item to po
		po.AddItem(poItem);
	}
}

return po;            
}

下一步就是将 PurchaseOrderEntity 系列化到 XML。Microsoft .NET Framework 提供强大的 XML 系列化功能。您可以创建 XmlSerializerNamespaces 的一个实例,以为输出文档设置 XML 名称空间。创建 XmlWriterSettings 的一个实例来控制 XML 输出。您希望从输出中忽略 XML 声明,因为它被嵌入到消息体中。您可以使用 XmlTextWriter 类将对象系列化到一个 XML 文本写器,后者将其输出写到 StringBuilder 的一个实例。最后,您可以使用 XmlSerializerSerialize() 方法将 PurchaseOrderEntity 的实例系列化到 XML。清单 13 显示了这个过程。

清单 13. GetPurchaseOrderAsXml() 方法
private string GetPurchaseOrderAsXml(OrderEntity order)
{
// get purchase order
PurchaseOrderEntity po = this.GetPurchaseOrder(order); ;

StringBuilder output = new StringBuilder();

// no name space
XmlSerializerNamespaces ns = new XmlSerializerNamespaces();
ns.Add("", "");

// settings to omit 
XmlWriterSettings settings = new XmlWriterSettings();
settings.OmitXmlDeclaration = true;

XmlWriter writer = XmlTextWriter.Create(output, settings);
XmlSerializer serializer = new XmlSerializer(typeof(PurchaseOrderEntity));
serializer.Serialize(writer, po, ns);

Debug.WriteLine(output.ToString());

return output.ToString();}

现在,使用 清单 14 中的代码将包含购买订单的消息发送到 Amazon SQS 队列。

清单 14. CreatePOQueueItem() 方法
public bool CreatePOQueueItem(String poXml)
{
MessageQueueSQSDataProvider queueFactory = DW4DataContext.MessageQueueData;
MessageQueueEntity message = new MessageQueueEntity();
message.MessageBody = poXml;
message.MessageType = MessageTypes.PurchaseOrder;
return queueFactory.InsertQueueItem(message);			
}

MessageQueueSQSDataProviderInsertQueueItem() 方法根据消息的类型在正确的队列中创建队列项。该方法调用基础方法 SendMessage() 将消息发送到 Amazon SQS 队列,如 清单 15 所示。

清单 15. InsertQueueItem() 方法
public bool InsertQueueItem(MessageQueueEntity message)
{
String queueName = null;
if (message.MessageType == MessageTypes.OrderSummary)
{
	queueName = ConfigurationSettings.AppSettings["OrderSummaryQueue"];
	return this.SendMessage(queueName, message.MessageBody);
}
else if (message.MessageType == MessageTypes.PurchaseOrder)
{
	queueName = ConfigurationSettings.AppSettings["PurchaseOrderQueue"];
	return this.SendMessage(queueName, message.MessageBody);
}

return false;            
}

SendMessage() 方法创建作为 Amazon API 的一部分的 SendMessageRequest。所需的参数为队列名和消息体(XML 文档)。如果 SendMessageResponse 不为 null,消息则被成功发送。接下来的几行写出了问题诊断的关键调试信息。清单 16 显示了完成后的 SendMessage() 方法。

清单 16. SendMessage() 方法
public bool SendMessage(string queueName, string messageBody) {
bool result = true;
try {
	String accessKeyId = 
		ConfigurationSettings.AppSettings["AmazonAccessKeyID"]; ;
	String secretAccessKey = 
		ConfigurationSettings.AppSettings["AmazonSecretAccessKey"];
	AmazonSQS service = new AmazonSQSClient(accessKeyId, secretAccessKey);

	// build request
	SendMessageRequest request = new SendMessageRequest();
	request.QueueName = queueName;
	request.MessageBody = messageBody;
	// send message
	SendMessageResponse response = service.SendMessage(request);
	if (response.IsSetSendMessageResult()) {
		Debug.WriteLine("Send Message Result:");
		SendMessageResult sendMessageResult = response.SendMessageResult;
		if (sendMessageResult.IsSetMessageId()) {
			Debug.WriteLine(String.Format("\tMessageId: {0}", 
				sendMessageResult.MessageId));
		}
		if (sendMessageResult.IsSetMD5OfMessageBody()) {
			Debug.WriteLine("\tMD5 Of Message Body: ", 
				sendMessageResult.MD5OfMessageBody);
		}
	}
	if (response.IsSetResponseMetadata()) {
		Debug.WriteLine("Response Metadata:");
		ResponseMetadata responseMetadata = response.ResponseMetadata;
		if (responseMetadata.IsSetRequestId()) {
			Debug.WriteLine(String.Format("\tRequest Id: {0}", 
				responseMetadata.RequestId));
		}
	}
}
catch (AmazonSQSException ex) {
	Debug.WriteLine("Caught Exception: " + ex.Message);
	Debug.WriteLine("Response Status Code: " + ex.StatusCode);
	Debug.WriteLine("Error Code: " + ex.ErrorCode);
	Debug.WriteLine("Error Type: " + ex.ErrorType);
	Debug.WriteLine("Request ID: " + ex.RequestId);
	Debug.WriteLine("XML: " + ex.XML);
	result = false;
}
return result;
}

分销商库存管理服务

InventoryManagementService 负责管理库存并处理来自制造商的订单汇总。类似于 OrderFulfillmentService,该服务使用在 App.config 文件中指定的轮询时间间隔。将发生以下行为:

  • ProcessIncomingMerchandise() 方法首先获取接收到但尚未处理的所有订单汇总的列表(OrderSummaryEntity)。
  • InventoryManagementService 方法获取来自 Amazon SQS OSQueue 的消息。
  • 然后,对每个未处理的商品发货调用 ProcessOrderReceipt() 方法。

清单 17 显示了 ProcessIncomingMerchandise() 方法如何获取列表。

清单 17. ProcessIncomingMerchandise() 方法
public int ProcessIncomingMerchandise()
{
int itemsProcessed = 0;

OrderSummaryFactory osFactory = new OrderSummaryFactory();
IList<OrderSummaryEntity> orders = osFactory.GetOrderSummariesToProcess();

// iterate through all order summaries
IEnumerator<OrderSummaryEntity> orderEnum = orders.GetEnumerator();
while (orderEnum.MoveNext())
{
	// get current order summary
	OrderSummaryEntity order = orderEnum.Current;

	// process order summary received
	if (this.ProcessOrderReceipt(order))
	{
		itemsProcessed++;
	}
}

Debug.WriteLine(String.Format("Orders Processed: {0}", itemsProcessed));
return itemsProcessed;
}

ProcessOrderReceipt() 方法遍历订单汇总中的每个项(OrderSummaryDetailEntity),并增加接收到的商品的库存数量。清单 18 显示了这个方法。

清单 18. ProcessOrderReceipt() 方法
private bool ProcessOrderReceipt(OrderSummaryEntity order)
{
	if (order == null) return false;

	bool result = true;

	// add to inventory
	InventoryFactory inventoryFactory = new InventoryFactory();
	
	// iterate through all items in order summary
	IEnumerator<OrderSummaryDetailEntity> itemsEnum = order.Items.GetEnumerator();
	while (itemsEnum.MoveNext())
	{
		// get current item
		OrderSummaryDetailEntity orderItem = itemsEnum.Current;
		
		// get item
		int itemId = orderItem.ItemId;
		InventoryItemEntity item = inventoryFactory.GetInventoryItem(itemId);

		// increase inventory
		item.Quantity = item.Quantity + orderItem.Quantity;
		result = inventoryFactory.UpdateInventoryItem(item);
		if (!result) break;
	}

	if (result)
	{
		MessageQueueFactory queueFactory = new MessageQueueFactory();
		queueFactory.DeleteQueueItem(order.QueueItem);
	}

	return result;
}

最后一个步骤是,当成功处理库存发货之后,将订单汇总队列项从消息队列中删除。清单 19 显示了如何使用 C# 从 Amazon SQS 队列删除消息。

清单 19. DeleteMessage() 方法
public bool DeleteMessage(string queueName, string messageHandle) {
bool result = false;
try {
  String accessKeyId = 
           ConfigurationSettings.AppSettings["AmazonAccessKeyID"];
  String secretAccessKey = 
           ConfigurationSettings.AppSettings["AmazonSecretAccessKey"];
  AmazonSQS service = new AmazonSQSClient(accessKeyId, secretAccessKey);
 
  // Create delete message request object
  DeleteMessageRequest request = new DeleteMessageRequest();
  
  // Set queue name containing item to be deleted
  request.QueueName = queueName;
  
  // Set handle of message to be deleted
  request.ReceiptHandle = messageHandle;
  
  // Delete message
  DeleteMessageResponse response = service.DeleteMessage(request);
  Debug.WriteLine("Service Response");                
 
  Debug.WriteLine("\tDelete Message Response");
  // If message response
  if (response.IsSetResponseMetadata())
  {
           Debug.WriteLine("\tResponse Metadata");
           ResponseMetadata responseMetadata = response.ResponseMetadata;
           if (responseMetadata.IsSetRequestId())
           {
                   Debug.WriteLine(String.Format("\tRequest Id: {0}", 
                            responseMetadata.RequestId));
           }
  }
}
catch (AmazonSQSException ex)
{
  Debug.WriteLine("Caught Exception: " + ex.Message);
  Debug.WriteLine("Response Status Code: " + ex.StatusCode);
  Debug.WriteLine("Error Code: " + ex.ErrorCode);
  Debug.WriteLine("Error Type: " + ex.ErrorType);
  Debug.WriteLine("Request ID: " + ex.RequestId);
  Debug.WriteLine("XML: " + ex.XML);
}
 
return result;
}

制造商实现

该解决方案的最后一个组件是制造商的 Order Processing Service,它是基于 Java 的服务,负责处理来自分销商的订单。Order Processing Service:

  • 轮询购买订单队列,该队列包含由分销商发送的购买订单。
  • 在收到购买订单之后,它创建一个订单汇总,并使用另一个 Amazon SQS 队列将其发送回给分销商。
  • 删除分销商发送来的购买订单。

制造商应用程序包含 表 5 中的业务实体。

表 5. 制造商应用程序中的业务实体
描述
AddresssEntity包含一个客户或业务地址
CustomerOrderEntity包含客户订单(购买订单)
CustomerOrderDetailEntity包含每个客户订单(购买订单)的商品细节
MessageQueueEntity包含消息队列项(即购买订单或订单汇总)
OrderSummaryEntity包含订单发货汇总
OrderSummaryDetailEntity包含订单发货汇总的商品细节

这个解决方案对所有数据库操作使用 JdbcQuery 帮助类。此外,还使用另外两个帮助类:DateUtil 用于格式化数据;AppConfig 用于读取配置参数。SqsDataProvider 类同时也是提供 Amazon SQS 功能的实用程序类。


制造商订单处理服务

OrderProcessingService 类负责根据在 AppConfig.properties 文件中指定的轮询时间间隔处理购买订单。该服务获取尚未发货的购买订单列表,并处理这些订单。清单 20 中的 getMessageQueueItems 方法展示了如何使用 Java 代码从特定的 Amazon SQS 队列获取消息列表。

清单 20. getMessageQueueItems() 方法
protected ArrayList<HashMap<String, String>> getMessageQueueItems(String queueName) {
ArrayList<HashMap<String, String>> results = new ArrayList<HashMap<String, String>>();
 
try {
  String accessKeyId = AppConfig.getProperty("AmazonAccessKeyID");
  String secretAccessKey =  AppConfig.getProperty("AmazonSecretAccessKey");
 
  AmazonSQS service = new AmazonSQSClient(accessKeyId, secretAccessKey);
  
  // get messages from queue
  ReceiveMessageRequest request = new ReceiveMessageRequest();
  request.setQueueName(queueName);
  request.setVisibilityTimeout(1);
  
  ReceiveMessageResponse response = service.receiveMessage(request);
  
  // if received response
  if (response.isSetReceiveMessageResult()) {
           System.out.println("\tReceive Message Result");
           ReceiveMessageResult receiveMessageResult = 
                   response.getReceiveMessageResult();
           
           // get all messages and iterate through them
           List<Message> messageList = receiveMessageResult.getMessage();
           for (Message message : messageList) {
                   // build hashmap containing each message
                   HashMap<String, String> row = new HashMap<String, String>();
                   
                   if (message.isSetMessageId()) {
                            row.put(COLUMN_ID, message.getMessageId());
                            System.out.println("\t\tMessageId: " + 
                                     message.getMessageId());
                   }
                   if (message.isSetReceiptHandle()) {
                            row.put(COLUMN_HANDLE, message.getReceiptHandle());
                   }
                   if (message.isSetBody()) {                                   
                            row.put(COLUMN_MESSAGE, message.getBody());
                   }
                   
                   // add row
                   results.add(row);
           }
  }        
} catch (AmazonSQSException ex) {
  System.out.println("Caught Exception: " + ex.getMessage());
  System.out.println("Response Status Code: " + ex.getStatusCode());
  System.out.println("Error Code: " + ex.getErrorCode());
  System.out.println("Error Type: " + ex.getErrorType());
  System.out.println("Request ID: " + ex.getRequestId());
  System.out.println("XML: " + ex.getXML());
}
 
return results;
}

Java 技术不像 .NET 框架那样具有内置的 XML 系列化功能,因此使用 JDOM API 来生成 XML 文档。JDOM 类似于 DOM,但它提供一个简单的 Java API 来解析、操作和输出 XML。参见 参考资料 了解关于如何使用 JDOM 的信息。

PurchaseOrderSerializer 类负责将消息中的 XML 内容反序列化到 Java 类。您将使用 JDOM 来解析 XML 内容,如 清单 21 所示。(对于理解这段代码,参考 清单 7 中的购买订单模式会有帮助)。

清单 21. PurchaseOrderSerializer 类
public class PurchaseOrderSerializer {

public CustomerOrderEntity deserializePO(MessageQueueEntity message) {
  CustomerOrderEntity order = null;
  
  String xml = message.getMessageBody();
  System.out.println("Purchase Order:\n" + xml);
  
  try {
           // Create input source from string containing XML
           InputSource xmlIS = new InputSource();
           xmlIS.setCharacterStream(new StringReader(xml));
           
           // Initialize SAX Builder object
           SAXBuilder builder = new SAXBuilder();
 
           // Build JDOM Document object from input source
           Document doc = builder.build(xmlIS);
           Element rootElem = doc.getRootElement();
           
           // get order id
           String id = rootElem .getChildText("Id");        
           
           // create customer order 
           order = new CustomerOrderEntity(id);
           
           // set order date
           String dateStr = rootElem.getChildText("OrderDate");
           order.setOrderDate(DateUtil.getDate(dateStr));
            
           // get company (customer) address element node and parse it
           Element addrElem = rootElem.getChild("Company");
           this.setCompanyAddress(order, addrElem);

           // get vendor address element node and parse it
           addrElem = rootElem.getChild("Vendor");
           this.setVendorAddress(order, addrElem);

           // get order items list and parse it
           Element itemsElem = rootElem.getChild("Items");
           this.setOrderItems(order, itemsElem);
           
           System.out.println(order.toString());
  } catch (IOException e) {
           e.printStackTrace();
  } catch (NullPointerException e) {
           e.printStackTrace();
  } catch (JDOMException e) {
           e.printStackTrace();
  }
  return order;
}
 
private void setVendorAddress(CustomerOrderEntity order, Element addrElem) {
  // get data from xml
  String coName = addrElem.getChildText("CompanyName");
  String streetAddress = addrElem.getChildText("StreetAddress");
  String city = addrElem.getChildText("City");
  String state = addrElem.getChildText("State");
  String zipCode = addrElem.getChildText("ZipCode");
  
  AddressEntity coAddress = order.getVendorAddress();
   
  // build address entity object using data read from xml
  AddressEntity coAddress = order.getCompanyAddress();
  coAddress.setCity(city);
  coAddress.setCompanyName(coName);
  coAddress.setStreetAddress(streetAddress);
  coAddress.setState(state);
  coAddress.setZipCode(zipCode);
}
  
private void setCompanyAddress(CustomerOrderEntity order, Element addrElem) {
  // get data from xml
  String coName = addrElem.getChildText("CompanyName");
  String streetAddress = addrElem.getChildText("StreetAddress");
  String city = addrElem.getChildText("City");
  String state = addrElem.getChildText("State");
  String zipCode = addrElem.getChildText("ZipCode");
  
  // build address entity object using data read from xml
  AddressEntity coAddress = order.getCompanyAddress();
  coAddress.setCity(city);
  coAddress.setCompanyName(coName);
  coAddress.setStreetAddress(streetAddress);
  coAddress.setState(state);
  coAddress.setZipCode(zipCode);
}
 
private void setOrderItems(CustomerOrderEntity order, Element itemsElem) {
  List itemList = itemsElem.getChildren();
  if (itemList == null) return;
  
  // iterate through all items in collection
  for (int index = 0; index < itemList.size(); index++) {
           // get current element
           Element curElem = (Element) itemList.get(index);
           
           // get item values
           String itemId = curElem.getAttributeValue("Id");
           String itemName = curElem.getAttributeValue("Name");
           String quantity = curElem.getAttributeValue("Quantity");
           
           // create order item
           CustomerOrderDetailEntity detail = new CustomerOrderDetailEntity();
           detail.setItemId(itemId);
           detail.setItemName(itemName);
           detail.setQuantity(Integer.parseInt(quantity));
  
           // add order item to order summary
           order.addItem(detail);
  }
}
}

接下来,为购买订单构建订单汇总。清单 22 显示了所需的代码。

清单 22. getOrderSummaryForCustomerOrder() 方法
public OrderSummaryEntity getOrderSummaryForCustomerOrder(
	CustomerOrderEntity customerOrder) {
	
	if (customerOrder == null) return null;
	
	// create order
	OrderSummaryEntity orderSummary = new OrderSummaryEntity();
	orderSummary.setOrderDate(new Date());
	orderSummary.setReferenceId(customerOrder.getId());
	
	// set addresses
	orderSummary.setCompanyAddress(customerOrder.getVendorAddress());
	orderSummary.setCustomerAddress(customerOrder.getCompanyAddress());
	
	// add items
	Iterator<CustomerOrderDetailEntity> itemIter = customerOrder.getItems();
	while(itemIter.hasNext()) {
		CustomerOrderDetailEntity item = itemIter.next();
		
		OrderSummaryDetailEntity detail = new OrderSummaryDetailEntity();
		detail.setItemId(item.getItemId());
		detail.setItemName(item.getItemName());
		detail.setQuantity(item.getQuantity());
		
		orderSummary.addItem(detail);
	}
	
	return orderSummary;
}

然后,您必须将 OrderSummaryEntity 系列化到 XML。您需要使用 JDOM 系列化该消息,如 清单 23 所示。(参考 清单 8 中的订单汇总模式会有帮助)。

清单 23. OrderSummarySerializer 类
public class OrderSummarySerializer {
	public String serializeOrder(OrderSummaryEntity order) {
		Document doc = new Document();
		
		Element rootElem = new Element("OrderSummary");
		doc.addContent(rootElem);
		
		// add id
		Element idElem = new Element("OrderId");
		idElem.setText(order.getOrderId());
		rootElem.addContent(idElem);
		
		// add reference id
		Element referenceIdElem = new Element("ReferenceId");
		referenceIdElem.setText(order.getReferenceId());
		rootElem.addContent(referenceIdElem);
		
		// add order date
		Element dateElem = new Element("OrderDate");
		String dateStr = DateUtil.getDateString(new Date());
		dateElem.setText(dateStr);		
		rootElem.addContent(dateElem);
		
		// set company address
		Element addrElem = this.getElementForAddress("CompanyAddress", 
			order.getCompanyAddress());
		rootElem.addContent(addrElem);
		
		// set customer address
		addrElem = this.getElementForAddress("CustomerAddress", 
			order.getCustomerAddress());
		rootElem.addContent(addrElem);
		
		// iterate through all items and add each item to order
		Element itemsElem = new Element("Items");
		Iterator<OrderSummaryDetailEntity> itemIter = order.getItems();
		while(itemIter.hasNext()) {
			OrderSummaryDetailEntity item = itemIter.next();
			
			Element itemElem = new Element("Item");
			itemElem.setAttribute("ItemId", item.getItemId());
			itemElem.setAttribute("ItemName", item.getItemName());
			String quantityStr = String.valueOf(item.getQuantity());
			itemElem.setAttribute("Quantity", quantityStr);
			
			itemsElem.addContent(itemElem);
		}
		rootElem.addContent(itemsElem);
		
		// convert xml document to string
		XMLOutputter outp = new XMLOutputter();
		StringWriter strWriter = new StringWriter();        
		try {
			outp.output(doc, strWriter);
		} catch (IOException e) {
			e.printStackTrace();
		}		
		return strWriter.toString();
	}

	private Element getElementForAddress(String elemName, AddressEntity address) {
		Element addrElem = new Element(elemName);
		
		String coName = address.getCompanyName();
		String streetAddress = address.getStreetAddress();
		String cityName = address.getCity();
		String stateName= address.getState();
		String zipCode = address.getZipCode();
		
		Element coElem = new Element("CompanyName");
		coElem.setText(coName);
		addrElem.addContent(coElem);
		
		Element streetElem = new Element("StreetAddress");
		streetElem.setText(streetAddress);
		addrElem.addContent(streetElem);
		
		Element cityElem = new Element("City");
		cityElem.setText(cityName);
		addrElem.addContent(cityElem);
		
		Element stateElem = new Element("State");
		stateElem.setText(stateName);
		addrElem.addContent(stateElem);
		
		Element zipCodeElem = new Element("ZipCode");
		zipCodeElem.setText(zipCode);
		addrElem.addContent(zipCodeElem);
		
		return addrElem;
	}
}

使用 Amazon Java API 将 XML 消息发送到 Amazon SQS 队列。清单 24 显示了如何发送消息。

清单 24. sendMessage() 方法
protected boolean sendMessage(String queueName, String messageBody) {
boolean result = false;
try {
	String accessKeyId = AppConfig.getProperty("AmazonAccessKeyID");
	String secretAccessKey =  AppConfig.getProperty("AmazonSecretAccessKey");

	AmazonSQS service = new AmazonSQSClient(accessKeyId, secretAccessKey);
	
	SendMessageRequest request = new SendMessageRequest();
	request.setQueueName(queueName);
	request.setMessageBody(messageBody);
	
	SendMessageResponse response = service.sendMessage(request);
	// Send Message Response
	if (response.isSetSendMessageResult()) {
		System.out.println("\tSend Message Result");
		SendMessageResult sendMessageResult = response.getSendMessageResult();
		if (sendMessageResult.isSetMessageId()) {
			System.out.println("\t\tMessageId: " + 
				sendMessageResult.getMessageId());
			result = true;
		}
	} 		
} catch (AmazonSQSException ex) {
	System.out.println("Caught Exception: " + ex.getMessage());
	System.out.println("Response Status Code: " + ex.getStatusCode());
	System.out.println("Error Code: " + ex.getErrorCode());
	System.out.println("Error Type: " + ex.getErrorType());
	System.out.println("Request ID: " + ex.getRequestId());
	System.out.println("XML: " + ex.getXML());
}

return result;
}

最后,您必须从 Amazon SQS 队列删除购买订单队列项。清单 25 显示了如何删除消息。

清单 25. deleteMessage() 方法
protected boolean deleteMessage(String queueName, String handle) {
boolean result = false;

try {
	String accessKeyId = AppConfig.getProperty("AmazonAccessKeyID");
	String secretAccessKey =  AppConfig.getProperty("AmazonSecretAccessKey");

	AmazonSQS service = new AmazonSQSClient(accessKeyId, secretAccessKey);
	DeleteMessageRequest request = new DeleteMessageRequest();
	request.setQueueName(queueName);
	request.setReceiptHandle(handle);
	
	DeleteMessageResponse response = service.deleteMessage(request);

	// Delete Message Response
	if (response.isSetResponseMetadata()) {
		// Response Metadata
		ResponseMetadata  responseMetadata = response.getResponseMetadata();
		if (responseMetadata.isSetRequestId()) {
			System.out.print("RequestId: " + 
				responseMetadata.getRequestId());
			result = true;
		}
	} 
} catch (AmazonSQSException ex) {
	System.out.println("Caught Exception: " + ex.getMessage());
	System.out.println("Response Status Code: " + ex.getStatusCode());
	System.out.println("Error Code: " + ex.getErrorCode());
	System.out.println("Error Type: " + ex.getErrorType());
	System.out.println("Request ID: " + ex.getRequestId());
	System.out.println("XML: " + ex.getXML());
}

return result;
}

测试解决方案

现在,您可以测试这个解决方案了。为此,为库存不足的商品提交一个订单。然后,您可以跟踪整个过程:

  • 提交订单
  • 分销商推迟订单并创建一个购买订单
  • 制造商接收订单并向分销商发送订单汇总
  • 分销商收到订单汇总并处理推迟的订单

图 6 中的分销商客户端在添加新订单之前显示库存余量。

图 6. 浏览库存
列出库存商品及其数量的库存管理窗口的屏幕截图

图 7 显示了订单提交窗口。

图 7. 提交订单
Customer Order 窗口屏幕截图,订单请求的商品数量大于库存列出的数量

图 8 显示了 Browse Orders 窗口。

图 8. 浏览订单
列出已处理和未处理订单的 Browse Orders 窗口屏幕截图,还列出了 ID、客户名、订单日期、发货日期和订单是否被推迟

在添加了新的订单之后,图 9 中的窗口显示了当该订单被推迟时的订单完成服务的输出。

图 9. 分销商订单完成和库存管理服务
分销商订单完成和库存管理服务的输出消息的屏幕截图

在添加了新的订单之后,图 10 中的窗口显示了 POQueue 消息队列的内容。

图 10. 查看消息队列
带有消息的 View Queues 窗口的屏幕截图

制造商接收来自消息队列的购买订单并处理它。图 11 显示了制造商的订单处理系统的输出。

图 11. 制造商的订单管理服务
制造商的订单管理服务发出的消息的屏幕截图

结束语

本文演示了如何利用 XML 和 Amazon SQS Web 服务集成企业业务应用程序。通过将 XML 的扩展性和 Amazon SQS 结合起来可以提供灵活的、伸缩性强的应用程序。这个解决方案的主要益处是伸缩性、扩展性和集成的便捷性。该集成不需要考虑任何防火墙或安全问题。这个解决方案的最后一个好处是 —— 同时也是 Web 服务的关键驱动力 —— 它是跨平台的,并且不受所采用技术的技术的影响。制造商和分销商在集成业务流程以促进运营效率和提高投资收益时,仍然可以保留现有的技术平台。

这个解决方案也存在一些局限性。首先,Amazon SQS 的消息体的最大尺寸为 8KB。您可以通过其他可行的办法绕过这一限制:

  • 使用 Amazon Simple Storage Service (Amazon S3) 存储大型 XML 文档,并为 Amazon SQS 消息中的 Amazon S3 项添加一个句柄。
  • 使用 Amazon SimpleDB 存储结构化数据,并为 Amazon SQS 消息中的项添加句柄。

第二个限制是 Amazon 应用于队列的最终一致性方法。这有好的一面也有坏的一面,但 Amazon 肯定认为其好处是多于坏处的。该解决方案的最后一个限制就是没有优先级或 FIFO 方法。再次强调一下,Amazon 是有意这样设计的,因为它能提供伸缩性更强、可用性更高以及更可靠的解决方案。

如果您在消息体中使用 XML 进行数据交换,最佳实践建议您执行 XML Schema (XSD) 验证,以确保发送器和接收器使用相同的语言。就像在其他 XML 集成项目中一样,发送器和接收器需要使用通用的模式。


下载

描述名字大小
分销商应用程序 C# 项目NETSolution.zip223KB
制造商应用程序 Java 项目JavaSolution.zip24KB
配置脚本DbScripts.zip1KB

参考资料

学习

  • Amazon SQS 技术文档:查看关于 Amazon SQS 库的信息。
  • JDOM 文档:了解关于 JDOM 库的信息,以及这个基于 Java 的解决方案如何从 Java 代码中访问、操作和输出 XML 数据。
  • 连接到云,第 3 部分:云治理和安全性(Mark O'Neill,developerWorks,2009 年 6 月):这篇文章很好的描述了 Amazon SQS 使用的密匙,并探索了如何设置监管策略,以保护敏感信息和监控云应用程序的访问和使用。在该系列的 连接到云,第 1 部分:在应用程序中使用云 中,考察了云计算服务和基础设施的混合示例(Mark O'Neill,developerWorks,2009 年 3 月)。在本系列的 连接到云,第 2 部分:实现混合云模型 中开发了一个混合应用程序(Mark O'Neill,developerWorks,2009 年 4 月)。
  • Eventual consistency:参考 Werner Vogels (Amazon CTO) 关于 Amazon SQS 设计原则上的最终一致性的博客条目。
  • Simplify XML programming with JDOM(Wes Biggs 和 Harry Evans,developerWorks,2001 年 5 月):JDOM 类似于 DOM,但它提供一个简单的 Java API 来解析、操作和输出 XML。了解如何使用这个开源 API,让 Java 开发人员更加轻松地操作 XML 文档。
  • IBM XML 认证:了解如何才能成为一名 IBM 认证的 XML 和相关技术的开发人员。
  • XML 技术库:访问 developerWorks XML 专区,获得广泛的技术文章和技巧、教程、标准和 IBM 红皮书。
  • developerWorks 技术活动网络广播:随时关注技术的最新进展。
  • developerWorks podcasts:收听针对软件开发人员的有趣访谈和讨论。

获得产品和技术

讨论

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=XML, Web development
ArticleID=416601
ArticleTitle=利用 Amazon Web Services 集成企业应用程序
publish-date=07272009