级别: 初级 Babu Sundaram (babu@cs.uh.edu), 网格专家, Consultant
2005 年 8 月 19 日 WS-Notification 为系统中的组件提供了一种彼此进行通信的方法,但实现它可能有一些不必要的复杂。幸运的是,并非一定要如此。本文将简要介绍 Globus Toolkit 4(GT4)中内建的通知支持,它允许在网格实体之间传递通知消息,并定义服务提供者所期望的接口和通知消息的处理程序。本文将以一个简单的网格服务为例,展示如何向网格服务中添加通知支持,这样它就可以充当一个通知生产者;本文还将介绍如何构建订阅和消费这些通知消息的客户机。
Globus Toolkit 中的通知支持
对象或 Web 服务之间的交互和通信通常都包括通知。在网格环境中,事件驱动通知消息也非常关键。例如,网格资源的系统管理员可能希望,无论何时资源上的负载超过某个阈值,都会收到一条消息。WS-Notification 是一系列的规范,它们定义了一个标准的基于 Web 服务的方法来解决这个通知问题。GT4 实现了 WS-Notification 家族的规范,提供了对网格实体之间的标准通知消息交换的支持,并提供了一个用来组织消息的 XML 模型。
本文假设您熟悉 WS-Notification 家族规范的基本概念,并将重点介绍如何使用 GT4 实现这些规范。有关这一家族规范的更多信息,请参阅“使用 WS-Notification”。要使用本文中的代码,则需要熟悉 Java™ 编程语言,并且还需要了解 GT4 支持语言的概念。
设置
我们将演示为一个网格服务添加通知支持所需要的步骤,方法是向一个 WSRF 兼容的拍卖服务添加一些通知支持。我们将创建一个拍卖服务,设置一台客户机来观察拍卖的最新价格,并了解我们在竞价时会发生什么。
有关安装 GT4 的完整信息,请参阅 参考资料 中 GT4 的文档。更多的简要介绍,则请参阅“GT4 安全性简介”。要确保在完成设置之后,已经设置了 JAVA_HOME、ANT_HOME 和 GLOBUS_LOCATION 变量,并运行了 $GLOBUS_LOCATION/etc/globus-user-env.sh 脚本来设置其他环境变量。
现在让我们来了解一下网格服务文件本身。要查看实际程序的构建过程,请参阅教程“理解 WSRF,第 3 部分:使用 WS-Notification 进行发布-订阅”,我们真正需要做的是从一个基本可以运行的应用程序代码着手开始工作。
为开发文件选择一个位置,例如 /home/globus/auction。从现在开始,我们将称这个位置为 AUCTION_HOME。在将源文件(参见后面的“下载”)解压到这个目录中时,会看到以下目录结构:
<AUCTION_HOME>/etc
<AUCTION_HOME>/schema
<AUCTION_HOME>/schema/tutorial
<AUCTION_HOME>/src |
编辑 AUCTION_HOME 目录中的 build.xml 文件,使 GLOBUS_LOCATION 指向 GT4 的安装路径。
为网格服务添加通知支持
完成这些设置之后,就可以为这个 Auction 网格服务添加订阅支持来获得通知了。首先,我们先在描述 Auction 服务接口的 WSDL 文件中添加订阅支持。在下面的代码中,为了方便阅读,我们使用黑体表示要做的修改。使用任何文本编辑器打开 <AUCTION_HOME>/schema/tutorial/auction_port_type.wsdl 文件,并添加以下内容:
<definitions name="Auction"
targetNamespace="http://tutorial.globus.org/auction"
...
xmlns:wsntw=
"http://docs.oasis-open.org/wsn/2004/06/wsn-
WS-BaseNotification-1.2-draft-01.wsdl"
xmlns="http://schemas.xmlsoap.org/wsdl/">
<import
namespace="http://docs.oasis-open.org/wsrf/2004/06/wsrf-
WS-BaseFaults-1.2-draft-01.wsdl"
location="../wsrf/faults/WS-BaseFaults.wsdl"/>
<import
namespace="http://docs.oasis-open.org/wsrf/2004/06/wsrf-WS-
ResourceProperties-1.2-draft-01.wsdl"
location="../wsrf/properties/WS-ResourceProperties.wsdl"/>
<import
namespace="http://docs.oasis-open.org/wsrf/2004/06/wsrf-WS-
ServiceGroup-1.2-draft-01.wsdl"
location="../wsrf/servicegroup/WS-ServiceGroup.wsdl"/>
<import
namespace="http://docs.oasis-open.org/wsn/2004/06/wsn-WS-
BaseNotification-1.2-draft-01.wsdl"
location="../wsrf/notification/WS-BaseN.wsdl" />
<types>
<schema
targetNamespace="http://tutorial.globus.org/auction"
xmlns:tns="http://tutorial.globus.org/auction"
xmlns="http://www.w3.org/2001/XMLSchema"
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<import
namespace="http://schemas.xmlsoap.org/ws/2004/03/addressing"
schemaLocation="../ws/addressing/WS-Addressing.xsd"/>
<import
namespace="http://docs.oasis-open.org/wsrf/2004/06/wsrf-WS-
ServiceGroup-1.2-draft-01.xsd"
schemaLocation="../wsrf/servicegroup/WS-ServiceGroup.xsd"/>
<element name="message" type="xsd:string"/>
<element name="lastModified" type="xsd:dateTime"/>
<element name="AuctionResourceProperties">
<complexType>
<sequence>
<element ref="tns:message"/>
<element ref="tns:lastModified"/>
</sequence>
</complexType>
</element>
<element name="Write" type="xsd:string" />
<element name="WriteResponse">
<complexType/>
</element>
<element name="Create">
<complexType/>
</element>
<element name="CreateResponse" type="wsa:
EndpointReferenceType" />
</schema>
</types>
<message name="WriteRequest">
<part name="WriteRequest"
element="tns:Write" />
</message>
<message name="WriteResponse">
<part name="WriteResponse"
element="tns:WriteResponse" />
</message>
<message name="CreateRequest">
<part name="CreateRequest"
element="tns:Create" />
</message>
<message name="CreateResponse">
<part name="CreateResponse"
element="tns:CreateResponse" />
</message>
<portType name="AuctionPortType"
wsrp:ResourceProperties="AuctionResourceProperties">
<operation name="GetResourceProperty">
<input name="GetResourcePropertyRequest"
message="wsrpw:GetResourcePropertyRequest"
wsa:Action="http://docs.oasis-open.org/wsrf/2004/06/wsrf-WS-
ResourceProperties/
GetResourceProperty"/>
<output name="GetResourcePropertyResponse"
message="wsrpw:GetResourcePropertyResponse"
wsa:Action="http://docs.oasis-open.org/wsrf/2004/06/wsrf-WS-
ResourceProperties/
GetResourcePropertyResponse"/>
<fault name="InvalidResourcePropertyQNameFault"
message="wsrpw:InvalidResourcePropertyQNameFault"/>
<fault name="ResourceUnknownFault" message="wsrpw:
ResourceUnknownFault"/>
</operation>
<operation name="Write">
<input name="WriteRequest"
message="tns:WriteRequest" />
<output name="WriteResponse"
message="tns:WriteResponse" />
</operation>
<operation name="QueryResourceProperties">
<input name="QueryResourcePropertiesRequest"
message="wsrpw:QueryResourcePropertiesRequest"
wsa:Action="http://docs.oasis-open.org/wsrf/2004/06/wsrf-WS-
ResourceProperties
/QueryResourceProperties"/>
<output name="QueryResourcePropertiesResponse"
message="wsrpw:QueryResourcePropertiesResponse"
wsa:Action="http://docs.oasis-open.org/wsrf/2004/06/wsrf-WS-
ResourceProperties
/QueryResourcePropertiesResponse"/>
<fault name="ResourceUnknownFault"
message="wsrpw:ResourceUnknownFault" />
<fault name="InvalidResourcePropertyQNameFault"
message="wsrpw:InvalidResourcePropertyQNameFault" />
<fault name="UnknownQueryExpressionDialectFault"
message="wsrpw:UnknownQueryExpressionDialectFault" />
<fault name="InvalidQueryExpressionFault"
message="wsrpw:InvalidQueryExpressionFault" />
<fault name="QueryEvaluationErrorFault"
message="wsrpw:QueryEvaluationErrorFault" />
</operation>
<operation name="Create">
<input name="CreateRequest"
message="tns:CreateRequest" />
<output name="CreateResponse"
message="tns:CreateResponse" />
</operation>
<operation name="Destroy">
<input message="wsrlw:DestroyRequest"
wsa:Action="http://docs.oasis-open.org/wsrf/2004/06/wsrf-WS-
ResourceLifetime
/Destroy"/>
<output message="wsrlw:DestroyResponse"
wsa:Action="http://docs.oasis-open.org/wsrf/2004/06/wsrf-WS-
ResourceLifetime
/DestroyResponse"/>
<fault
message="wsrlw:ResourceNotDestroyedFault"
name="ResourceNotDestroyedFault"/>
<fault message="wsrlw:ResourceUnknownFault"
name="ResourceUnknownFault"/>
</operation>
<!-The "subscribe" operation messages and types are
defined here -->
<operation name="Subscribe">
<input message="wsntw:SubscribeRequest"
wsa:Action="http://docs.oasis-open.org/wsn/2004/06/wsn-
WS-BaseNotification/Subscribe"/>
<output message="wsntw:SubscribeResponse"
wsa:Action="http://docs.oasis-open.org/wsn/2004/06/wsn-
WS-BaseNotification/SubscribeResponse"/>
<fault name="TopicPathDialectUnknownFault"
message="wsntw:TopicPathDialectUnknownFault"/>
<fault name="SubscribeCreationFailedFault"
message="wsntw:SubscribeCreationFailedFault"/>
<fault name="ResourceUnknownFault"
message="wsntw:ResourceUnknownFault"/>
</operation>
</portType>
</definitions>
|
SubscribeRequest、SubscribeResponse、
TopicPathDialectUnknownFault 和 SubscribeCreationFailedFault 消息都是在 WS-Notification 包含的文件中定义的,因此这段代码会为该服务添加一些新功能。
接下来,需要在 Auction 类中定义一些必需的成员变量和操作。然后将这些内容添加到 <AUCTION_HOME>/src/org/globus/tutorial/auction/Auction.java 中。现在这个服务包含一个 TopicList 类型的私有成员,该成员提供了一种组织通知消息的机制。这样,我们就可以添加该实例的多个与资源有关的属性作为这个 TopicList 的一部分。而且,我们还实现了一个公共方法,在查询结束时,该方法将返回 TopicList。
package org.globus.tutorial.auction;
import org.globus.wsrf.Resource;
import org.globus.wsrf.ResourceIdentifier;
import org.globus.wsrf.ResourceProperty;
import org.globus.wsrf.ResourcePropertySet;
import org.globus.wsrf.ResourceProperties;
import org.globus.wsrf.Topic;
import org.globus.wsrf.TopicList;
import org.globus.wsrf.TopicListAccessor;
import org.globus.wsrf.impl.ResourcePropertyTopic;
import org.globus.wsrf.impl.SimpleTopicList;
import org.globus.wsrf.impl.ReflectionResourceProperty;
import org.globus.wsrf.impl.SimpleResourcePropertyMetaData;
import org.globus.wsrf.impl.SimpleResourcePropertySet;
import org.globus.wsrf.impl.SimpleResourceProperty;
import java.util.Calendar;
public class Auction implements Resource, ResourceProperties,
TopicListAccessor, ResourceIdentifier
{
// this is the initial message that all auction instances will start with
final static String INITIAL_MESSAGE =
"Welcome to the Auction Service (No Bid made yet!)";
/** the identifier of this auction*/
private Object id;
/** Stores the ResourceProperties of this auction*/
private ResourcePropertySet propSet;
/** The message displayed on the auction. */
private ResourceProperty message;
private ResourceProperty lastModified;
private Calendar terminationTime;
private TopicList topicList;
/** initializes the Auction. */
public void initialize() throws Exception {
// choose an ID
this.id = new Integer(hashCode());
// create the resource property set
this.propSet = new SimpleResourcePropertySet
(AuctionConstants.RP_SET);
// create resource properties
this.message = new SimpleResourceProperty
(AuctionConstants.MESSAGE_RP);
// create the topic list
this.topicList = new SimpleTopicList(this);
// now wrap the message resource property so that it is also a topic
this.message = new ResourcePropertyTopic(this.message);
this.topicList.addTopic((Topic) this.message);
this.propSet.add(this.message);
this.message.add(INITIAL_MESSAGE);
// initialise the last modified RP to the present time
this.lastModified =
new SimpleResourceProperty(AuctionConstants.LAST_MODIFIED_RP);
this.propSet.add(lastModified);
this.lastModified.add(Calendar.getInstance());
// these are the RPs necessary for resource lifetime management
ResourceProperty prop;
// this property exposes the termination time
prop = new ReflectionResourceProperty(SimpleResource
PropertyMetaData.TERMINATION_TIME, this);
this.propSet.add(prop);
// this property exposes the current time
prop = new ReflectionResourceProperty(SimpleResource
PropertyMetaData.CURRENT_TIME, this);
this.propSet.add(prop);
}
public ResourcePropertySet getResourcePropertySet()
{
return propSet;
}
public void setMessage(String m)
{
this.message.set(0, m);
// now touch the last modified time
this.lastModified.set(0,Calendar.getInstance());
}
public Object getID()
{
return id;
}
public Calendar getCurrentTime() {
return Calendar.getInstance();
}
public TopicList getTopicList() {
return this.topicList;
}
}
|
规范要求任何包含通知的 WS-Resource 都以资源属性的形式提供一些主题信息。在这种情况下,我们将创建一个“message”资源属性名范围之外的主题。(记住,消费者订阅的是有关某个特定主题的通知。)
现在,必须让 <AUCTION_HOME>/deploy-server.wsdd 了解订阅操作。要实现这一点,则需要将 SubscribeProvider 作为一个受网格服务实例支持的操作来添加。修改如下:
<deployment name="defaultServerConfig"
xmlns="http://xml.apache.org/axis/wsdd/"
xmlns:aggr="http://mds.globus.org/aggregator/types"
xmlns:java="http://xml.apache.org/axis/wsdd/providers/java"
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<service name="AuctionService" provider="Handler"
use="literal" style="document">
<parameter name="providers" value=
"GetRPProvider QueryRPProvider DestroyProvider
SubscribeProvider"/>
<parameter name="handlerClass" value="org.globus.axis.
providers.RPCProvider"/>
<parameter name="scope" value="Application"/>
<parameter name="allowedMethods" value="*"/>
<parameter name="activateOnStartup" value="true"/>
<parameter name="className"
value="org.globus.tutorial.auction.AuctionService"/>
<wsdlFile>share/schema/tutorial/Auction_service.
wsdl</wsdlFile>
</service>
</deployment>
|
到现在为止,我们已经添加了对处理通知消息订阅的服务器端支持。我们还要生成客户端工具来订阅这些通知并消费它们。我们需要在 <AUCTION_HOME>/src/org/globus/tutorial/auction/client/WatchBid.java 中实现这些客户端操作。要确保这个文件存在并包含以下内容:
package org.globus.tutorial.auction.client;
import org.apache.axis.message.MessageElement;
import org.apache.axis.message.Text;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.oasis.wsrf.properties.WSResourceProperties
ServiceAddressingLocator;
import org.oasis.wsrf.properties.QueryResourceProperties_Element;
import org.oasis.wsrf.properties.QueryResourcePropertiesResponse;
import org.oasis.wsrf.properties.QueryResourceProperties_PortType;
import org.oasis.wsrf.properties.QueryExpressionType;
import org.oasis.wsn.NotificationProducer;
import org.oasis.wsn.Subscribe;
import org.oasis.wsn.TopicExpressionType;
import org.oasis.wsn.WSBaseNotificationServiceAddressingLocator;
import org.apache.axis.types.URI;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.CommandLine;
import org.globus.tutorial.auction.AuctionConstants;
import org.oasis.wsrf.properties.ResourcePropertyValue
ChangeNotificationType;
import org.globus.wsrf.NotificationConsumerManager;
import org.globus.wsrf.NotifyCallback;
import org.globus.wsrf.WSNConstants;
import org.globus.wsrf.WSRFConstants;
import org.globus.wsrf.client.BaseClient;
import org.globus.wsrf.core.notification.ResourcePropertyValue
ChangeNotificationElementType;
import org.globus.wsrf.utils.AnyHelper;
import org.globus.wsrf.utils.FaultHelper;
import javax.xml.namespace.QName;
import javax.xml.rpc.Stub;
import java.util.List;
public class WatchBid extends BaseClient implements NotifyCallback {
final static WSResourcePropertiesServiceAddressingLocator locator =
new WSResourcePropertiesServiceAddressingLocator();
public static void main(String[] args) {
WatchBid client = new WatchBid();
// parse the commandline
try {
CommandLine line = client.parse(args);
} catch(ParseException e) {
System.err.println("Parsing failed: " + e.getMessage());
System.exit(1);
} catch (Exception e) {
System.err.println("Error: " + e.getMessage());
System.exit(1);
}
// subscribe to the resource property
try {
// first set up a NotificationConsumer endpoint
NotificationConsumerManager consumer = null;
// Create client side notification consumer
consumer = NotificationConsumerManager.getInstance();
consumer.startListening();
EndpointReferenceType consumerEPR =
consumer.createNotificationConsumer(client);
// now subscribe to the remote Auction instance's message,
// pointing notifications at the EPR we created above.
Subscribe request = new Subscribe();
request.setUseNotify(Boolean.TRUE);
request.setConsumerReference(consumerEPR);
TopicExpressionType topicExpression = new TopicExpressionType();
topicExpression.setDialect(WSNConstants.SIMPLE_TOPIC_DIALECT);
topicExpression.setValue(AuctionConstants.MESSAGE_RP);
request.setTopicExpression(topicExpression);
// geta port to talk to
WSBaseNotificationServiceAddressingLocator notifLocator =
new WSBaseNotificationServiceAddressingLocator();
NotificationProducer producerPort =
notifLocator.getNotificationProducerPort(client.getEPR());
EndpointReferenceType subscriptionEPR =
producerPort.subscribe(request).getSubscriptionReference();
// we could now use subscriptionEPR to manage the
// subscription, but for now we don't.
} catch(Exception e) {
if (client.isDebugMode()) {
FaultHelper.printStackTrace(e);
} else {
System.err.println("Error: " + FaultHelper.getMessage(e));
}
}
while(true) {
System.out.println("Waiting for notification. Ctrl-C to end.");
try {
Thread.sleep(30000);
} catch(Exception e) {
System.out.println("Interrupted while sleeping.");
}
}
}
// Notification callback - this will be called every time a notification
// is delivered to the consumer created above.
public void deliver(List topicPath,
EndpointReferenceType producer,
Object message) {
ResourcePropertyValueChangeNotificationType changeMessage =
((ResourcePropertyValueChangeNotificationElementType) message).
getResourcePropertyValueChangeNotification();
if(changeMessage != null) {
System.out.println("Got notification with value: " +
changeMessage.getNewValue().get_any()[0].getValue());
}
}
}
|
用户通过脚本来调用客户端订阅操作。要为客户端创建必需的脚本,则需要确保 <AUCTION_HOME>/etc/post-deploy.xml 包含以下内容:
<property name="class.name"
value="org.globus.tutorial.auction.client.CreateAuction"/>
</ant>
<!-Support for client side script to watch for
notifications about a bid -->
<ant antfile="${build.launcher}"
target="generateLauncher">
<property name="launcher-name" value="watch-bid"/>
<property name="class.name"
value="org.globus.tutorial.auction.client.WatchBid"/>
</ant>
</target>
</project>
|
在编译时,会在 GLOBUS_LOCATION/bin 目录中创建一个客户端可执行脚本,该脚本会正确设置环境变量,并根据客户机的行为向提供者订阅通知。
采用的通知操作
现在将启动容器来处理我们的例子,从而了解通知和订阅操作究竟是如何操作的。但是首先,我们需要编译这个新的拍卖服务代码,并将其作为容器的一部分进行部署。确保我们目前位于 AUCTION_HOME 目录中,并键入以下内容:
>> ant clean
>> ant deploy
|
如果一切运行良好,则应该看到一条 BUILD SUCCESSFUL 消息。否则,需要查看输出结果,找到问题并修正它,然后再次运行 ant 脚本。部署完服务之后,需要启动或重新启动这个容器。在容器窗口中,如果有必要,可以按下 <ctrl>-C 来停止正在运行的服务,然后再次键入以下内容:
>> globus-start-container -nosec
.......<other_services>.......
http://localhost:8080/wsrf/services/AuctionService
.......<other_services>.......
|
这个命令将启动容器。我们应该可以看到一个已安装服务的清单(包括 AuctionService),该窗口不应该返回控制权。要停止这个容器,可以在这个窗口中按下 <ctrl>-C 键。
另外打开一个窗口,并执行以下操作。要检查我们的设置是否工作,请按照下面的方式创建一个拍卖服务的实例:
>> $GLOBUS_LOCATION/bin/create-auction -s
http://localhost:8080/wsrf/services/AuctionService
New Auction Created...
EPR written to file: bid-581376501.epr
>>
|
现在,打开用来订阅通知的第三个窗口;并在第二个窗口中观察进行竞价之后的变化。
>> $GLOBUS_LOCATION/bin/watch-bid -e bid-581376501.epr
Waiting for notification. Ctrl-c to end.
|
我们已经创建了一台等待关于竞价发生变化的通知的客户机,并且它不会返回控制权。当提供新的竞价时,就会在第三个窗口中看到一条指示该操作的通知消息。
在第二个窗口中,给出了一个新的价格,它成功设置了当前的竞价值。如下所示:
>> $GLOBUS_LOCATION/bin/offer-bid -e bid-581376501.epr 12345
Current Bid = Welcome to the Auction Service (No Bid made yet!)
Your Bid = 12345
Bid Accepted.
>>
|
同时,观察第三个窗口,可以发现如下所示的变化:
Waiting for notification, Ctrl-c to end.
Got notification with value: 12345
Waiting for notification, Ctrl-c to end.
|
我们可以重复以上步骤,并在第二个终端窗口中使用 offer-bid 来修改当前的竞价值。每次成功修改竞价值之后,都会在第三个窗口中看到打印出的通知消息,其中包含与以前竞价值类似的当前竞价值。要在第三个窗口(watch-bid)中终止通知客户机,请按下 Ctrl-C 键。
结束语
本文简要介绍了 GT4 中的通知实现。并以 Auction 服务为例,介绍了向网格服务添加通知支持所需的步骤。本文内容涉及通知的以下方面:
- 安装一个可以工作的 GT4 及其通知实现。
- 为一个示例网格服务添加服务支持。
- 创建一台充当通知的消费者的客户机。
- 从通知生产者那里订阅和获得消息。
下载 | 描述 | 名字 | 大小 | 下载方法 |
|---|
| Classes needed to run article examples | notification.tar | 70 KB |
FTP | HTTP |
|---|
参考资料
关于作者  | 
|  | Babu Sundaram 从 Globus Toolkit 早期就开始积极地参与网格计算的研究工作。他在 Argonne National Labs 实习时就已成为 Globus 实现团队的成员之一。他拥有医学工程的学士学位和计算机科学的硕士学位,目前他正在休斯顿大学攻读自己的博士学位。他已经在很多网格会议和与网格有关的组织发表了许多文章,并且与别人合著了许多有关网格计算的书籍。他热爱教学,有时会作为一名讲师来讲解有关 Web 服务和网格计算的课程。他非常欢迎读者对本文进行反馈,您可以通过 babu@cs.uh.edu 与他联系。 |
对本文的评价
|