Part 1 of this series was concerned with the implementation of an intelligent store and forward capability that suspends the forwarding of workload requests to endpoint systems in the event of an outage. Part 2 presents the implementation details of a workload throttling approach that you can use to impose a limit on the rate at which messages are propagated to a given endpoint system. An obvious practical application of this approach is the prevention of flooding endpoint systems that cannot process high volumes of concurrent requests.
We continue with the approach adopted in Part 1, which involves the presentation of the bare minimum implementation scenario to illustrate our intended throttling capability. However, note that the modular nature of SCA allows components similar to those featured here to be plugged in to a WebSphere Process Server solution. This allows existing solutions that use an asynchronous endpoint integration mechanism to be augmented with our simple throttling capability.
The illustrative scenario is shown in Figure1. QueueLoader is implemented as a simple POJO that loads the queue by iteratively propagating a copy of the incoming message to the downstream components of the solution via Throttle_Import, an SCA import with a JMS binding. This results in the loading of the JMS queue configured on the Throttle_Import definition with a set of similar messages.
Figure 1. High level overview of workload throttling scenario
Note that in a more realistic scenario, QueueLoader will be replaced by a component that encapsulates useful business functionality, perhaps a BPEL process.
EndpointStub represents the downstream component of our workload throttling scenario. It is configured to accept workload in the form of JMS messages via Throttle_Export, an SCA export with JMS binding.
The core throttling behavior is provided by configuring a WebSphere Scheduler instance to repeatedly invoke a specified EJB at a specified time interval. The EJB is responsible for simply transferring a message from the source JMS queue corresponding to Throttle_Import to the target JMS Queue corresponding to Throttle_Export. A simple Web application is provided that provides an interface to allow the management of throttling behavior.
The scenario described was developed using WebSphere Integration Developer V6.2.0.1, and tested using the WebSphere Process Server V6.2.0.1 integrated test environment.
We use a simple interface between all the components that make up the end-to-end scenario, as shown in Figure 2.
Figure 2. Scenario interface definition
The interface contains a single one-way operation that passes an instance of the request business object, as shown in Figure 3.
Figure 3. Request business object definition
We leverage a WebSphere Scheduler instance to provide the mechanism that incrementally invokes the message transfer capability. Although we can define a new Scheduler instance, for this example, we reuse an existing instance that you can view from the WebSphere Process Server Admin Console. The key value to note here is the JNDI name, which is used during throttle management and is described later in this article.
Figure 4. Predefined WebSphere Scheduler instances
The Scheduler invocation target is a stateless session EJB that implements the following interfaces:
- com.ibm.websphere.scheduler.TaskHandler
- com.ibm.websphere.scheduler.TaskHandlerHome
Listing 1 shows the Message Transfer EJB initialization code that is contained within the ejbCreate() method. The method retrieves a handle to a JMS QueueConnectionFactory that has been configured to provide access to a service integration bus (SIB) instance SCA.APPLICATION.widCell.bus and the JMS queues that correspond to those specified in the JMS Import and Export shown in Figure 1.
Listing 1. Message Transfer EJB initialization
public void ejbCreate() throws javax.ejb.CreateException
{
InitialContext ic = null;
try
{
ic = new InitialContext();
//Get the JMS QueueConnectionFactory
qcf = (QueueConnectionFactory) ic.lookup("jms/QCF");
//Get the JMS Queues
targetQueue = (Queue) ic.lookup("Throttle/Throttle_Export_RECEIVE_D");
sourceQueue = (Queue) ic.lookup("Throttle/Throttle_Import_SEND_D");
}
catch (NamingException e)
{
e.printStackTrace();
return;
}
}
|
The Queue Connection Factory definition that is used by the Message Transfer EJB to establish connectivity to the SIB instance is shown in Figure 5. Note that if security is enabled for the SIB instance specified in the Bus name field, it is necessary to specify an authentication alias via the component-managed authentication alias. For simplicity, we have only shown the upper portion of the Queue Connection Factory definition dialog. If necessary, you can specify the authentication alias via the component-managed authentication alias field in the lower portion of the dialog.
Figure 5. JMS Queue Connection Factory definition
Message Transfer is implemented within the process() method, which is the target of the repeated invocation from the configured WebSphere Scheduler instance. This method transfers a single message from the source to the target JMS queue at each invocation. The essential aspects of this message transfer code are shown in Listing 2.
Listing 2. Message Transfer EJB logic
public void process(TaskStatus arg0) throws RemoteException
{
Vector res = new Vector();
boolean isSecurityException = false;
boolean canAccessQueue = true;
QueueConnection con = null;
try
{
con = qcf.createQueueConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(sourceQueue);
con.start();
Message msg = consumer.receive(3000L);
if(msg instanceof TextMessage)
{
System.out.println("Text Message .... " + msg.toString());
writeMessage(msg);
}
session.close();
con.close();
}
catch (JMSException e)
{
e.printStackTrace();
}
}
|
The transfer of messages underpinning the throttling mechanism is managed
from the Throttle Manager, which is implemented as a servlet. Listing 3
shows how the transfer of messages can be started. The
doGet()method looks up the Application
Scheduler and scheduled EJB instances. A
BeanTaskInfo instance is used to represent
the desired scheduling characteristics, and is sent to the scheduler instance to
trigger the start of task scheduling. A scheduled task identifier is
retrieved from the scheduler instance and returned to the start operation
requester. This identifier may be needed subsequently to stop task
scheduling. Note that you can extend the code shown in Listing 3 to allow
additional management operations, such as stopping or modifying the
interval between successive message transfers.
Listing 3. Throttle Manager servlet logic
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException
{
PrintWriter writer = response.getWriter();
String operation = request.getParameter("operation");
if (operation.equalsIgnoreCase("start"))
{
try
{
// Retrieve scheduling interval parameter
String interval = request.getParameter("interval");
InitialContext ic = new InitialContext();
Scheduler scheduler = (Scheduler)ic.lookup("AppScheduler");
// Look up scheduled EJB
Object o = new InitialContext().lookup
("ejb/com/ibm/websphere/scheduler/TaskHandlerHome");
TaskHandlerHome home = (TaskHandlerHome)
javax.rmi.PortableRemoteObject.narrow(o,TaskHandlerHome.class);
// BeanTaskInfo is used to specify scheduling details
BeanTaskInfo taskInfo = (BeanTaskInfo)
scheduler.createTaskInfo(BeanTaskInfo.class);
// create a date object which represents 10 seconds from now
Date startDate = new Date(System.currentTimeMillis()+10000);
// now set the start time and task handler to be called in the task info
taskInfo.setTaskHandler(home);
taskInfo.setStartTime(startDate);
taskInfo.setNumberOfRepeats(-1);
taskInfo.setRepeatInterval(interval);
TaskStatus ts = scheduler.create(taskInfo);
writer.println("Task created with id:" + ts.getTaskId());
}
catch (Exception e )
{
System.out.println("Exception caught " + e);
}
}
|
To facilitate the demonstration of the throttling approach, we have provided a POJO that implements Throttle_Interface. Upon invocation of the operation1 method, the Queue Loader component retrieves the number of messages to be written, and then incrementally invokes the target reference. Since the reference is wired to an SCA import with a JMS binding, the message is written to the underlying JMS queue by the SCA infrastructure, as shown in Listing 4.
Listing 4. JMS Queue Loader logic
public void operation1(DataObject input)
{
ServiceManager serviceManager = new ServiceManager();
com.ibm.websphere.sca.Service target = (Service)
serviceManager.locateService("Throttle_InterfacePartner");
// Retrieve number of messages to be written to Queue
int msgCount = input.getInt("messageCount");
System.out.println("Number of Messages to be written = " + msgCount);
for (int i = 1; i <= msgCount; i++)
{
input.setString("messageNumber", Integer.toString(i));
target.invokeAsync("operation1", input);
}
}
|
The configuration details of the SCA import that is wired to the Queue Loader component are shown in Figure 6.
Figure 6. SCA Import JMS Binding Configuration
The final runtime artifact contained within our scenario is also implemented as a POJO. The Endpoint Stub provides evidence of the post-throttling arrival of the transmitted message by parsing the messageNumber business object field and writing out an appropriate informational message to the system log, as shown in Listing 5.
Listing 5. Endpoint stub logic
public void operation1(DataObject input)
{
System.out.println("Message Consumer Endpoint - Entry");
String msgNumber = input.getString("messageNumber");
System.out.println("Message Consumer Endpoint - msgNumber = " + msgNumber);
System.out.println("Message Consumer Endpoint - Exit");
}
|
The name of the target JMS queue that represents the target queue for the Message Transfer bean can be viewed in the configuration details of the JMS SCA Export.
Figure 7. SCA Export JMS Binding configuration
Let's now walk through a demonstration of the throttling capability described above. Deploy the applications that are provided as accompanying downloads to this article to your WebSphere Process Server instance:
Configure Tivoli Performance Viewer
We will use the Tivoli® Performance Viewer provided with the WebSphere Process Server Admin Console to monitor the throttled propagation of workload requests.
- To collect the data required to enable this monitoring capability,
navigate to Monitoring and Tuning > Performance Monitoring
Infrastructure (PMI) and ensure that monitoring is enabled, as
show in Figure 8.
Figure 8. Enable Performance Monitoring
- Switch to the Runtime tab, click Custom and navigate to SIB Service > SIB Messaging Engines > widNode.server1-SCA.APPLICATION.widCell.Bus > Destinations > Queues.
- You now see entries for the JMS queues that have been configured to
support the JMS SCA import and SCA export for the throttling
mechanism. Now enable the collection of data to support one statistic
for each JMS queue, as defined in Table 1.
JMS queue name Statistic Throttle.Throttle_Import_SEND_D_SIB AvailableMessagesCount Throttle.Throttle_Export_RECEIVE_D_SIB TotalMessagesConsumedCount
- To enable each statistic, click JMS Queue Name and select the corresponding statistic from the checkbox to the right-hand side of the displayed statistics table. Click the Enable button located above the table. Perform this sequence of actions to enable both statistics summarized in Table 1.
- You can now view a graphical output of these statistics over time by navigating to Monitoring and Tuning > Performance Viewer > Current Activity > Server1, and then selecting Performance Modules > SIB Service.
Now that you have configured Tivoli Performance Viewer to monitor the transfer of messages through our scenario, you can proceed to inject the workload:
- From the Throttle assembly diagram, right-click
QueueLoaderExport and select Test Component from the
context menu. Enter a value of
50into the messageCount field and click Continue. - Select the test server instance upon which to execute the test case. If security is enabled upon the test server, you are required to enter a user ID and password before being allowed to proceed.
- Upon invocation, the QueueLoader component will write the requested
number of messages to the JMS queue. The component test harness Events
window is updated to reflect the writing of the requested number of
messages to the JMS queue, as shown in Figure 9.
Figure 9. WebSphere Integration Developer component testing environment
Observe messages on SCA import JMS queue
You can observe the messages residing on the JMS queue by navigating to Service integration > Service Integration bus explorer. Select Destinations > Throttle.Throttle_Import_SEND_D_SIB > Queue Points, and then click the displayed Queue points, as shown in Figure 10.
Figure 10. Source JMS Queue Point showing message depth
You can now start the Message Transfer via the Throttle Manager servlet. Access the following URL from a Web browser:
http://localhost:nnnn/ThrottleControl/Manager?operation=start&interval=10seconds |
The Throttle Manager servlet will respond with a message that provides a task identifier. Make a note of this value if you plan to stop message transfer in the future.
The controlled transfer of messages, representing workload throttling, can be monitored from the Tivoli Performance Viewer session that we configured earlier. Figure 11 shows a graphical view of the statistics that we configured earlier as a function of time. This view provides an easily understandable representation of our throttling capability by illustrating how messages are gradually transferred from the source to target JMS queues, where they are consumed by the Endpoint Stub component in our simple scenario.
Figure 11. Tivoli Performance Viewer illustration of throttled message transfer
The implementation of process-based solutions often requires integrating widely differing endpoint systems. Sometimes these systems are unable to handle large volumes of concurrent messages. A tactical way to handle this situation is implementing a throttling capability within the integration middleware layer. WebSphere Process Server provides a rich set of underlying capabilities that can implement a solution for this type of integration challenge. This article described the JMS-compliant message bus and scheduler capabilities to implement a simple approach to asynchronous workload throttling. It also showed that you can easily monitor the progress of a throttled workload via the use of a performance monitoring utility, which is provided as part of the WebSphere Process Server administrative console.
| Description | Name | Size | Download method |
|---|---|---|---|
| Code sample | code_sample.zip | 51 KB | HTTP |
Information about download methods
Learn
-
WebSphere
Process Server throughput management: Part 1 – Intelligent store and forward capability
-
WebSphere Process Server Information Center
-
developerWorks WebSphere business process management zone
Discuss

Alan Hopkins is a Consulting IT Specialist with more than 20 years experience in IBM and related middleware technologies. He is currently a member of IBM Software Group Services Worldwide Technology Practice, based at the IBM Hursley Development Lab in the United Kingdom. For the past several years, Alan has focused primarily on the WebSphere Business Process Management technology stack.




