The service-oriented architecture (SOA) programming model exposed by WebSphere Process Server (hereafter called Process Server) helps implement a wide range of application design patterns. One commonly occurring pattern involves the synchronization of data between corresponding sets of source and target integration endpoint systems.
Any real world enterprise IT landscape includes a set of disparate applications that is deployed into differing runtime environments, exposing disparate interfaces. The design solution that overcomes these incompatibilities and facilitates the synchronization of data is the responsibility of the Integration Architect. This overall design must consider the differing non-functional characteristics of each integration endpoint system. Typical examples of differing non-functional characteristics include:
- Availability: Scheduled or unscheduled outages are likely to affect endpoint systems in different ways. This results in situations where participants in a data synchronization solution are unavailable at different points in time. In the case of a target system outage, you often need to suspend the forwarding of further requests. It may also be desirable to simultaneously suspend the acceptance of additional synchronization requests into the integration middleware hub.
- Concurrent processing capacity: Some target integration endpoint systems cannot process large concurrent volumes. Flooding these systems with excessive request volumes will result in operational problems, and at worst, may lead to target endpoint system failure.
Part 1 of a 2-part series presents a simple approach that implements an intelligent store and forward capability when target integration endpoints may become unavailable, independent of participating source systems. A subsequent article will present a simple approach to throttling integration flows within Process Server to ensure that the target endpoint systems are not flooded by high volumes of concurrent requests.
Although it is possible to use Process Server to implement a sophisticated n-way data synchronization hub capability, for the purposes of clarity, we isolate the core problem within a simple scenario comprising of two endpoint integration systems. This scenario comprises an integration flow that simply accepts inbound messages from a source system and propagates them to the target system. The source and target systems in our example are both provided by WebSphere MQ Queue Managers.
The integration solution deployed within Process Server comprises of two Service Component Architecture (SCA) modules (Figure 1). The first, which you can view as a workload injector, is responsible for retrieving inbound messages as they arrive at the source Queue Manager. The second SCA module, which you can view as a message forwarder, is responsible for propagating messages to the target Queue Manager. These SCA modules are connected together directly using the SCA wiring metaphor to form a simple end-to-end synchronization solution. However, note that the modular nature of SCA allows these modules to be wired together in different patterns; for example, to incorporate additional components that encapsulate additional functionality. Retrieval of incoming messages and forwarding of outbound messages is done via the use of SCA Export and Import constructs, respectively, each having a WebSphere MQ binding type.
Figure 1. Scenario SCA Assembly diagram
When both Queue Managers are available, messages are simply transferred. However, things get a little more interesting when there is a target system outage. There are a number of ways to handle this situation. The default behavior is to continue the retrieval of messages as they arrive at the source Queue Manager, and to attempt to deliver them to the target integration endpoint system. This results in a fault thrown per attempted message delivery, intercepted by the Process Server Failed Event subsystem. These failed events can then be managed from within the Failed Event Manager. However, for systems with high levels of data synchronization throughput, the number of captured failed events can potentially reach high levels. In these circumstances, it is preferable to implement a more manageable approach. The approach is that the consumption of inbound events needs to be suspended as soon as possible following the detection of a target endpoint system outage.
The key mechanism that allows these two otherwise independent SCA modules to work cooperatively is based around access to a shared variable that is defined within an Object Cache facility provided by the underlying WebSphere Application Server runtime engine.
The request injector component checks this indicator prior to the retrieval of a message from the source MQ Manager. If the availability indicator suggests that the endpoint service is available, the message is retrieved for processing. If the endpoint service is flagged as being unavailable, a Human Task is created to serve as a service outage notification.
At the start of inbound message processing, the system availability indicator is set to "true", indicating that the endpoint system is available. A failed attempt to forward a message to the unavailable target Queue Manager results in a thrown fault. This is caught by the Fault Handler within the request forwarder component. Upon catching the fault, the shared system availability flag is updated to indicate a system outage.
Key implementation details
WebSphere Object Cache
We exploit the Object Cache capability provided by the underlying WebSphere Application Server runtime engine to provide a shared variable that can be read and updated from both WorkloadInjector and RequestPropagator SCA modules. You will need to define a new Object Cache instance as shown in Figure 2 to execute the example code. Further details are provided in the Demonstration of endpoint system available execution path section later in this article.
Figure 2. Predefined object cache instance
The Workload Injector is provided as an SCA module, with a central SCA component implemented as a BPEL process (Figure 3). The module exposes two exports, the first of which is used to accept the initial triggering event used to invoke the BPEL process. The second export is used to iteratively retrieve incoming synchronization requests delivered to the source Queue Manager as messages. The module has a single import, which is used to wire the component to the Request Propagator module.
Figure 3. Workload Injector SCA module
At the start of inbound message processing, the system availability indicator is set to true to indicate that the endpoint system is available. The remainder of the BPEL process manages the iterative retrieval of a message from the source Queue Manager. At the start of each iteration, the system availability indicator determines whether to retrieve the next message for processing, or to suspend processing by scheduling a Human Task. Acceptance and completion of the task allow the end user to choose between continued message retrieval or process termination.
Figure 4. Workload Injector process
Acceptance of inbound messages from the source queue manager is handled by an SCA Export with a WebSphere MQ binding type. The construct is bound to the source queue manager, as shown in Figure 5.
Figure 5. InboundMessagesExport MQ binding configuration
During retrieval, the transformation of synchronization data between an MQ message payload and the corresponding xsd data type used to encapsulate data internally within Process Server is managed by a custom Data Handler. The source code for this custom Data Handler is found in the Project Interchange file provided as a download accompanying this article.
Correlation of incoming messages
WebSphere Process Server is optimized to provide a scalable, performant execution environment for large volumes of concurrent execution threads. It is possible that multiple in-flight process instances will exist at any instance.
Inbound messages are forwarded to the correct in-flight process instance by matching the value of one or more designated fields in the message payload to previously assigned values. This capability is specified via the use of a Correlation Set and specified set of contained properties. In the current example, we have defined a Correlation Set with a single contained property, objType, specifying managed synchronization (Figure 6).
Figure 6. Correlation property definition
This component is also implemented as a BPEL process and exploits an SCA import with MQ Series binding to interface to the target MQ Manager.
Figure 7. Request Propagator SCA assembly diagram
The main feature of interest here is the use of a BPEL Fault Handler to catch the Fault thrown when an attempt to forward a message to the target Queue Manager fails. Within this Fault Handler, we update the system availability indicator to reflect the unavailability of the target Queue Manager.
Figure 8. Request Propagator process
Demonstration of endpoint system available execution path
Let's first demonstrate the successful managed synchronization of messages from a source to a target Queue Manager. The SCA export and import used to interface to the source and target queue managers, respectively, have been configured to use the following definitions.
For the Source Queue Manager:
- Queue Manager name: Source_QM
- Soure Queue name: Inbound_Queue
- Host name: localhost
- TCP/IP Listener port: 1416
For the Target Queue Manager:
- Queue Manager name: Target_QM
- Soure Queue name: Outbound_Queue
- Host name: localhost
- TCP/IP Listener port: 1417
You must either create these Queue Managers in your environment, or modify the InboundMessages and OutboundMessages SCA Export/Import configurations to point to your source and target Queue Manager, respectively.
You must define the Object Cache instance that is used to provide shared variables. From the WebSphere Process Server administration console, navigate to Resources > Cache instances > Object cache instances and click New. Define an Object Cache instance with the parameters shown in Figure 9.
Figure 9. Object Cache definition
Note that you will also need to deploy the two applications provided in the accompanying Project Interchange file to your WebSphere Process Server instance. Ensure that both source and target queue managers are started. A convenient way to manage queue managers is to use the MQ Explorer utility program provided with WebSphere MQ.
You must now start the RequestInjector process. For simplicity, we shall use the BPC Explorer:
- Open up the BPC Explorer from either the WebSphere Integration Developer shell or a Web browser.
- Start a RequestInjector process instance by navigating to Process Instances.
- Select the process template and click Start Instance. At the
Process Input Message panel, provide a process instance name and a
Customerfor the objType field (Figure 10).
Figure 10. Start Request Injector process
- Click Submit to invoke the process instance. The process instance executes to the Retrieve Message Receive activity, where the process will wait pending the arrival of a message for processing.
- Open WebSphere MQ Explorer and write a message to Inbound_Queue in
Source_QM by right-clicking the queue name and selecting Put Test
Message (Figure 11).
Figure 11. Put Test message
- Provide a test message payload with the following format (Figure 12):
Figure 12. Test Message data
- Click Put Message to put the message to the queue. This causes
the waiting BPEL Receive activity to accept the message for
processing. Successful propagation of the message to Outbound_Queue on
Target_QM is demonstrated by the incremented value of the Current
Queue Depth for the target queue. You can view the outbound message by
right clicking Outbound-Queue and selecting Browse
Messages Note that the field delimiter in the outbound message
has been translated from a "," to a "|" symbol (Figure 13). This
translation occurred within the custom DataHandler during the outbound
translation from xsd data format to a character string message
Figure 13. Message propagated to Target Queue
At this point the RequestInjector has iterated, and once again will be waiting for the next message for propagation.
Demonstration of endpoint system unavailable execution path
Assuming that RequestInjector is waiting, pending the arrival of the next inbound message for synchronization as discussed above, you can proceed directly to execute the system unavailable path through the Intelligent store and forward implementation.
- Close Target_QM from WebSphere MQ Explorer by right clicking the queue manager name and selecting Stop.
- Submit a further inbound message by following the steps detailed in the previous section. During propagation of the synchronization message, WebSphere Process Server will throw an Exception due to the non-availability of Target_QM. The Stack trace associated with the Exception is written to the WebSphere Process Server SystemOut.log file, together with messages showing that the Fault Handler defined within the RequestPropagator BPEL Process has been invoked.
- Upon detection of an error during the synchronization downstream, the
WorkloadInjector process instance will generate a Notification in the
form of a Human Task. You can now take a look at this task by
selecting My To-dos from within the BPC Explorer (Figure 14).
Figure 14. Target Endpoint system outage Human Task
- Claim the task by selecting and clicking Work on. You can see
from the explanatory text in Figure 15 and the accompanying root cause
error message that the error is related to the queue manager
Figure 15. Target Endpoint system outage notification message
- Before completing the task, it is necessary to correct the problem
and deal with the in-flight failed synchronization flow. Start
Target_QM from the WebSphere MQ Explorer. Errors
encountered during asynchronous SCA invocations are persisted as
events in the Failed Event subsystem. Open up the Failed Event Manager
by navigating to Integration Applications > Failed Event
Manager from within the administration console. Click Get
all to retrieve all failed events (Figure 16).
Figure 16. Failed Event Manager
- Click the Event ID link to view further details (Figure 17).
Figure 17. Failed Event details
- You can view the payload of the data synchronization message and
modify if necessary by clicking View business data (Figure 18).
Figure 18. Failed Event Payload
Since you have fixed the root cause of the problem, you can now resubmit the Failed Event, allowing the data synchronization flow to execute to completion. Observe the arrival of the message in the Target_QM Outbound_Queue for evidence of a successful resubmission.
To continue data synchronization, you must complete the outstanding Human Task. Ensure that the output checkbox in the Task Output Message is selected and click Complete.
You can now continue to submit messages for synchronization by putting messages to Inbound_Queue using WebSphere MQ Explorer.
Note that you can terminate the process instance at any point in time from the Process Instances > Administered By Me panel of the BPC Explorer.
WebSphere Process Server provides a functionally rich, component-based programming model that is optimized for implementing modular solutions to a wide range of business problems. These solutions are frequently required to establish integration between multiple disparate endpoint systems. Therefore, the Integration Architect responsible for solution design must consider operational differences between endpoint systems, such as availability and concurrent processing capability. This article described how to use the Process Server programming model to implement an intelligent store and forward capability that provides near real-time responsiveness during an unexpected availability of a target endpoint system.
The original motivation for this article came about as a result of discussions with my IBM Software Group colleague, Paul Verschueren. Paul's original suggestion of a high level approach to implement an intelligent store and forward pattern is gratefully acknowledged.
|Code sample||IntelligentStoreForward_PI.zip||66 KB|
- The article Introduction to the IBM SOA programming model provides a good overview of the SOA programming model provided by WebSphere Process Server.
- The developerWorks WebSphere Web services zone provides articles, tutorials, standards, and other technical resources for Web services and SOA.
- The developerWorks WebSphere BPM zone contains resources related to the WebSphere Business Process Management product stack.
- Participate in the WebSphere Process Server discussion forum.