WebSphere Process Server throughput management, Part 1

Implementing an intelligent store and forward capability

WebSphere® Process Server is frequently used as the underpinning of solutions aimed at managing the event-driven synchronization of data between disparate application endpoints. An important consideration in such a solution is managing disparities in availability and concurrent processing capabilities of the source and target systems. Part 1 of a 2-part series presents a simple approach that implements an intelligent store and forward capability when target integration endpoints may become unavailable, independent of participating source systems.

Share:

Dr. Alan Hopkins (hopkinsa@uk.ibm.com), Consulting IT Specialist, IBM

Alan Hopkins photoAlan Hopkins is a Consulting IT Specialist with more than 20 years experience in IBM and related middleware technologies. He is currently a member of IBM Software Group Services Worldwide Technology Practice, based at the IBM Hursley Development Lab in the United Kingdom. For the past several years, Alan has focused primarily on the WebSphere Business Process Management technology stack.



01 July 2009

Also available in Chinese

Introduction

The service-oriented architecture (SOA) programming model exposed by WebSphere Process Server (hereafter called Process Server) helps implement a wide range of application design patterns. One commonly occurring pattern involves the synchronization of data between corresponding sets of source and target integration endpoint systems.

Any real world enterprise IT landscape includes a set of disparate applications that is deployed into differing runtime environments, exposing disparate interfaces. The design solution that overcomes these incompatibilities and facilitates the synchronization of data is the responsibility of the Integration Architect. This overall design must consider the differing non-functional characteristics of each integration endpoint system. Typical examples of differing non-functional characteristics include:

  • Availability: Scheduled or unscheduled outages are likely to affect endpoint systems in different ways. This results in situations where participants in a data synchronization solution are unavailable at different points in time. In the case of a target system outage, you often need to suspend the forwarding of further requests. It may also be desirable to simultaneously suspend the acceptance of additional synchronization requests into the integration middleware hub.
  • Concurrent processing capacity: Some target integration endpoint systems cannot process large concurrent volumes. Flooding these systems with excessive request volumes will result in operational problems, and at worst, may lead to target endpoint system failure.

Part 1 of a 2-part series presents a simple approach that implements an intelligent store and forward capability when target integration endpoints may become unavailable, independent of participating source systems. A subsequent article will present a simple approach to throttling integration flows within Process Server to ensure that the target endpoint systems are not flooded by high volumes of concurrent requests.

Scenario overview

Although it is possible to use Process Server to implement a sophisticated n-way data synchronization hub capability, for the purposes of clarity, we isolate the core problem within a simple scenario comprising of two endpoint integration systems. This scenario comprises an integration flow that simply accepts inbound messages from a source system and propagates them to the target system. The source and target systems in our example are both provided by WebSphere MQ Queue Managers.

The integration solution deployed within Process Server comprises of two Service Component Architecture (SCA) modules (Figure 1). The first, which you can view as a workload injector, is responsible for retrieving inbound messages as they arrive at the source Queue Manager. The second SCA module, which you can view as a message forwarder, is responsible for propagating messages to the target Queue Manager. These SCA modules are connected together directly using the SCA wiring metaphor to form a simple end-to-end synchronization solution. However, note that the modular nature of SCA allows these modules to be wired together in different patterns; for example, to incorporate additional components that encapsulate additional functionality. Retrieval of incoming messages and forwarding of outbound messages is done via the use of SCA Export and Import constructs, respectively, each having a WebSphere MQ binding type.

Figure 1. Scenario SCA Assembly diagram
Scenario SCA Assembly diagram

When both Queue Managers are available, messages are simply transferred. However, things get a little more interesting when there is a target system outage. There are a number of ways to handle this situation. The default behavior is to continue the retrieval of messages as they arrive at the source Queue Manager, and to attempt to deliver them to the target integration endpoint system. This results in a fault thrown per attempted message delivery, intercepted by the Process Server Failed Event subsystem. These failed events can then be managed from within the Failed Event Manager. However, for systems with high levels of data synchronization throughput, the number of captured failed events can potentially reach high levels. In these circumstances, it is preferable to implement a more manageable approach. The approach is that the consumption of inbound events needs to be suspended as soon as possible following the detection of a target endpoint system outage.

The key mechanism that allows these two otherwise independent SCA modules to work cooperatively is based around access to a shared variable that is defined within an Object Cache facility provided by the underlying WebSphere Application Server runtime engine.

The request injector component checks this indicator prior to the retrieval of a message from the source MQ Manager. If the availability indicator suggests that the endpoint service is available, the message is retrieved for processing. If the endpoint service is flagged as being unavailable, a Human Task is created to serve as a service outage notification.

At the start of inbound message processing, the system availability indicator is set to "true", indicating that the endpoint system is available. A failed attempt to forward a message to the unavailable target Queue Manager results in a thrown fault. This is caught by the Fault Handler within the request forwarder component. Upon catching the fault, the shared system availability flag is updated to indicate a system outage.


Key implementation details

WebSphere Object Cache

We exploit the Object Cache capability provided by the underlying WebSphere Application Server runtime engine to provide a shared variable that can be read and updated from both WorkloadInjector and RequestPropagator SCA modules. You will need to define a new Object Cache instance as shown in Figure 2 to execute the example code. Further details are provided in the Demonstration of endpoint system available execution path section later in this article.

Figure 2. Predefined object cache instance
Predefined object cache instance

Workload Injector

The Workload Injector is provided as an SCA module, with a central SCA component implemented as a BPEL process (Figure 3). The module exposes two exports, the first of which is used to accept the initial triggering event used to invoke the BPEL process. The second export is used to iteratively retrieve incoming synchronization requests delivered to the source Queue Manager as messages. The module has a single import, which is used to wire the component to the Request Propagator module.

Figure 3. Workload Injector SCA module
Workload Injector SCA module

At the start of inbound message processing, the system availability indicator is set to true to indicate that the endpoint system is available. The remainder of the BPEL process manages the iterative retrieval of a message from the source Queue Manager. At the start of each iteration, the system availability indicator determines whether to retrieve the next message for processing, or to suspend processing by scheduling a Human Task. Acceptance and completion of the task allow the end user to choose between continued message retrieval or process termination.

Figure 4. Workload Injector process
Workload Injector process

Acceptance of inbound messages from the source queue manager is handled by an SCA Export with a WebSphere MQ binding type. The construct is bound to the source queue manager, as shown in Figure 5.

Figure 5. InboundMessagesExport MQ binding configuration
InboundMessagesExport MQ binding configuration

During retrieval, the transformation of synchronization data between an MQ message payload and the corresponding xsd data type used to encapsulate data internally within Process Server is managed by a custom Data Handler. The source code for this custom Data Handler is found in the Project Interchange file provided as a download accompanying this article.

Correlation of incoming messages

WebSphere Process Server is optimized to provide a scalable, performant execution environment for large volumes of concurrent execution threads. It is possible that multiple in-flight process instances will exist at any instance.

Inbound messages are forwarded to the correct in-flight process instance by matching the value of one or more designated fields in the message payload to previously assigned values. This capability is specified via the use of a Correlation Set and specified set of contained properties. In the current example, we have defined a Correlation Set with a single contained property, objType, specifying managed synchronization (Figure 6).

Figure 6. Correlation property definition
Correlation property definition

Request Propagator

This component is also implemented as a BPEL process and exploits an SCA import with MQ Series binding to interface to the target MQ Manager.

Figure 7. Request Propagator SCA assembly diagram
Request Propagator SCA assembly diagram

The main feature of interest here is the use of a BPEL Fault Handler to catch the Fault thrown when an attempt to forward a message to the target Queue Manager fails. Within this Fault Handler, we update the system availability indicator to reflect the unavailability of the target Queue Manager.

Figure 8. Request Propagator process
Request Propagator process

Scenario execution

Demonstration of endpoint system available execution path

Let's first demonstrate the successful managed synchronization of messages from a source to a target Queue Manager. The SCA export and import used to interface to the source and target queue managers, respectively, have been configured to use the following definitions.

For the Source Queue Manager:

  • Queue Manager name: Source_QM
  • Soure Queue name: Inbound_Queue
  • Host name: localhost
  • TCP/IP Listener port: 1416

For the Target Queue Manager:

  • Queue Manager name: Target_QM
  • Soure Queue name: Outbound_Queue
  • Host name: localhost
  • TCP/IP Listener port: 1417

You must either create these Queue Managers in your environment, or modify the InboundMessages and OutboundMessages SCA Export/Import configurations to point to your source and target Queue Manager, respectively.

You must define the Object Cache instance that is used to provide shared variables. From the WebSphere Process Server administration console, navigate to Resources > Cache instances > Object cache instances and click New. Define an Object Cache instance with the parameters shown in Figure 9.

Figure 9. Object Cache definition
Object Cache definition

Note that you will also need to deploy the two applications provided in the accompanying Project Interchange file to your WebSphere Process Server instance. Ensure that both source and target queue managers are started. A convenient way to manage queue managers is to use the MQ Explorer utility program provided with WebSphere MQ.

You must now start the RequestInjector process. For simplicity, we shall use the BPC Explorer:

  1. Open up the BPC Explorer from either the WebSphere Integration Developer shell or a Web browser.
  2. Start a RequestInjector process instance by navigating to Process Instances.
  3. Select the process template and click Start Instance. At the Process Input Message panel, provide a process instance name and a value of Customer for the objType field (Figure 10).
    Figure 10. Start Request Injector process
    Start Request Injector process
  4. Click Submit to invoke the process instance. The process instance executes to the Retrieve Message Receive activity, where the process will wait pending the arrival of a message for processing.
  5. Open WebSphere MQ Explorer and write a message to Inbound_Queue in Source_QM by right-clicking the queue name and selecting Put Test Message (Figure 11).
    Figure 11. Put Test message
    Put Test message
  6. Provide a test message payload with the following format (Figure 12):
    firstName,lastName,addr1,addr2,city,state,zip,CustomerID,Customer
    Figure 12. Test Message data
    Test Message data
  7. Click Put Message to put the message to the queue. This causes the waiting BPEL Receive activity to accept the message for processing. Successful propagation of the message to Outbound_Queue on Target_QM is demonstrated by the incremented value of the Current Queue Depth for the target queue. You can view the outbound message by right clicking Outbound-Queue and selecting Browse Messages Note that the field delimiter in the outbound message has been translated from a "," to a "|" symbol (Figure 13). This translation occurred within the custom DataHandler during the outbound translation from xsd data format to a character string message payload.
    Figure 13. Message propagated to Target Queue
    Message propagated to Target Queue

At this point the RequestInjector has iterated, and once again will be waiting for the next message for propagation.

Demonstration of endpoint system unavailable execution path

Assuming that RequestInjector is waiting, pending the arrival of the next inbound message for synchronization as discussed above, you can proceed directly to execute the system unavailable path through the Intelligent store and forward implementation.

  1. Close Target_QM from WebSphere MQ Explorer by right clicking the queue manager name and selecting Stop.
  2. Submit a further inbound message by following the steps detailed in the previous section. During propagation of the synchronization message, WebSphere Process Server will throw an Exception due to the non-availability of Target_QM. The Stack trace associated with the Exception is written to the WebSphere Process Server SystemOut.log file, together with messages showing that the Fault Handler defined within the RequestPropagator BPEL Process has been invoked.
  3. Upon detection of an error during the synchronization downstream, the WorkloadInjector process instance will generate a Notification in the form of a Human Task. You can now take a look at this task by selecting My To-dos from within the BPC Explorer (Figure 14).
    Figure 14. Target Endpoint system outage Human Task
    Target Endpoint system outage Human Task
  4. Claim the task by selecting and clicking Work on. You can see from the explanatory text in Figure 15 and the accompanying root cause error message that the error is related to the queue manager Target_QM.
    Figure 15. Target Endpoint system outage notification message
    Target Endpoint system outage notification message
  5. Before completing the task, it is necessary to correct the problem and deal with the in-flight failed synchronization flow. Start Target_QM from the WebSphere MQ Explorer. Errors encountered during asynchronous SCA invocations are persisted as events in the Failed Event subsystem. Open up the Failed Event Manager by navigating to Integration Applications > Failed Event Manager from within the administration console. Click Get all to retrieve all failed events (Figure 16).
    Figure 16. Failed Event Manager
    Failed Event Manager
  6. Click the Event ID link to view further details (Figure 17).
    Figure 17. Failed Event details
    Failed Event details
  7. You can view the payload of the data synchronization message and modify if necessary by clicking View business data (Figure 18).
    Figure 18. Failed Event Payload
    Failed Event Payload

Since you have fixed the root cause of the problem, you can now resubmit the Failed Event, allowing the data synchronization flow to execute to completion. Observe the arrival of the message in the Target_QM Outbound_Queue for evidence of a successful resubmission.

To continue data synchronization, you must complete the outstanding Human Task. Ensure that the output checkbox in the Task Output Message is selected and click Complete.

You can now continue to submit messages for synchronization by putting messages to Inbound_Queue using WebSphere MQ Explorer.

Note that you can terminate the process instance at any point in time from the Process Instances > Administered By Me panel of the BPC Explorer.


Conclusion

WebSphere Process Server provides a functionally rich, component-based programming model that is optimized for implementing modular solutions to a wide range of business problems. These solutions are frequently required to establish integration between multiple disparate endpoint systems. Therefore, the Integration Architect responsible for solution design must consider operational differences between endpoint systems, such as availability and concurrent processing capability. This article described how to use the Process Server programming model to implement an intelligent store and forward capability that provides near real-time responsiveness during an unexpected availability of a target endpoint system.


Acknowledgement

The original motivation for this article came about as a result of discussions with my IBM Software Group colleague, Paul Verschueren. Paul's original suggestion of a high level approach to implement an intelligent store and forward pattern is gratefully acknowledged.


Download

DescriptionNameSize
Code sampleIntelligentStoreForward_PI.zip66 KB

Resources

Learn

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into WebSphere on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=WebSphere
ArticleID=404973
ArticleTitle=WebSphere Process Server throughput management, Part 1
publish-date=07012009