Building an aggregation function using WebSphere ESB

Many consider the aggregation function as an integral part of an Enterprise Service Bus (ESB). While the IBM WebSphere ESB product does not provide an aggregation function and includes no aggregation primitives, you can write your own. This article provides two sample aggregation primitives, explains how they were created, and shows you how to use them as templates to create your own, more complete versions.

Russell Butek (butek@us.ibm.com), Software Engineer, EMC

Russell Butek is an IBM Web services consultant. He has been one of the developers of the IBM WebSphere Web services engine. He is also a member the JAX-RPC Java Specification Request (JSR) expert group. He was involved in the implementation of Apache's AXIS SOAP engine, driving AXIS 1.0 to comply with JAX-RPC 1.0. Previously, he was a developer of the IBM CORBA ORB and an IBM representative on a number of OMG task forces, including chairing the portable interceptor task force. Contact Russell at butek@us.ibm.com.



15 August 2007

Introduction

This article describes how to build an ESB aggregation function via user-defined mediation primitive plug-ins. It provides two new sample primitives to get you started. While they do not address issues such as exception handling that you would need to address in a production environment, they can serve as templates for building your own fully functional primitives. This article has three parts:

  • A brief description of aggregation functions appropriate for an ESB architecture.
  • A guide to the aggregation mediation primitive plug-ins provided with this article.
  • An explanation of how to build your own aggregation mediation primitive plug-ins.

Aggregation functions appropriate for an ESB

What, exactly, is aggregation? There are many definitions, but we will use one relevant to an ESB:

ESB aggregation function
An ESB aggregation function collates responses from multiple service invocations into a single response. It handles infrastructure logic and does not apply business logic.

So what constitutes business logic? For the purposes of this article, it is anything that supplies semantics to the data. Any meaning in the data is the domain of business logic. For example, It is okay for an ESB to know that data is a string, integer, or enumeration type, which may be necessary when performing ESB functions such as transformations. But knowing the meaning of data -- for instance, that a particular number is a zip code, part number, or result of a cosine function -- should not be necessary for an ESB to perform its tasks.

What belongs in an ESB can be a heated topic, and one could argue that the aggregation function resides in the grey area between ESB and business processes. But this introduction should help you understand the reasoning behind the rest of this article.

When the aggregation function narrowed by this definition, a large set of functions still applies, including:

Collating like responses

The ESB could call a set of services, each returning the same logical data (the physical structure of the data may be different, but the ESB transformation functionality can normalize it). For example, an inventory could be spread through multiple warehouses. An application may want to know the total inventory for a given item, but doesn't know how to query all of the warehouses. An ESB could provide this application with a single service that internally calls each warehouse's service.

You might think that the ESB could tally all inventory counts and return a single result. But that would involve stepping over the line between infrastructure and business logic, because the result has semantics -- the knowledge that the values can be summed -- which is not appropriate for an ESB. A more appropriate scenario would involve the ESB service returning an array of inventories, with each array element representing the result from a single warehouse. The ESB should not process the data, but merely collate it.

Collating dissimilar responses
An ESB could provide a service that collects information from multiple sources and combines this information into a single response. For example, customer data might be stored in various databases, each of which provides a service to access its particular data. The ESB could call each of these services and build an aggregate structure containing all of this data in one place. The application that uses this ESB only has to make a single service call.

Enough architectural discussion -- let's try an actual implementation. We will build an ESB aggregation mediation flow using WebSphere® ESB and developed using WebSphere Integration Developer.

At the bottom of this article, you can download a Project Interchange file and load it into WebSphere Integration Developer. It contains everything you need to install the two new aggregation primitives, plus the example that this article demonstrates.

Guide to the aggregation mediation primitive

The aggregation described in this article encompasses the class of function described above as "collating dissimilar responses." This function uses two new mediation primitives:

FanOut
Resides on a request flow. It duplicates the incoming message and sends it to a set of callouts.
FanIn
Resides on a response flow. It collects the responses from the fanned-out messages into a single response.

The primitives shown here are just examples to show you how to build your own aggregation function. They are not fully-functional, product quality artifacts. Therefore the last section of this article, "Building your own aggregation primitive," is the most important. But to understand how to build these primitives, you first must understand how they are intended to be used.

FanOut

The input terminal of this primitive receives the aggregate interface's request message. This same message must go out on all output terminals. Since the aggregate interface's request message is different than each service's request message, a user will likely wire an XSL transformation primitive after each FanOut output terminal.

FanIn

The output terminal of this primitive sends a single, aggregated response message back to the service consumer. All input terminals must be of the same type as the output terminal. The response body of the aggregate interface must contain a block of data for each individual service call's response. So if you have the following set of service calls:

  • Service call 1 returns DataType1.
  • Service call 2 returns DataType2.
  • . . .
  • Service call n returns DataTypeN.

Then the aggregate interface's returned data type must look like the structure in Listing 1:

Listing 1. Aggregate data type format
<xsd:complexType name="aggregateType">
  <xsd:sequence>
    <xsd:element name="data1" type="DataType1"/>
    <xsd:element name="data2" type="DataType2"/>
    ...
    <xsd:element name="dataN" type="DataTypeN"/>
  </xsd:sequence>
</xsd:complexType>

The types need not be in the same order nor defined by the same schema. They should logically contain the same data, but the actual structure can be massaged by transformation primitives as needed. In fact, you must wire an XSL transformation primitive in front of each FanIn input terminal and place the service's response data into the equivalent subtype of the aggregate response data. And you must tell the FanIn primitive which subtype applies for each input terminal, which you do via the primitive properties. You create one row for each input terminal and provide an XPath to the relevant subtype.

The FanIn primitive does not flow a response from its output terminal until all incoming responses have been received and their subtypes collected. This behavior can easily in a hung response if there is a problem getting a response from any of the aggregated service calls. This is one of the areas that you must expand upon in order to make these primitives production-ready.

Correlation context

The response messages must be correlated to the request messages so that the FanIn primitive can return the correct responses for the given requests. Response messages are correlated to request messages via the correlation context in the SMO. However, note that the correlation context field in the SMO is a user field, and is not intended to be used by the internals of mediation primitives. Since these primitives do rely on this field, its behavior must be made known to the user. If the user is not using the correlation context, then everything is fine, but if the user does use the correlation context, then, in order to function with the aggregation primitives, the BO that they construct must have a string attribute named fanOutID dedicated to the use of the aggregation primitives. That's all there is to this aggregation function. Now let's look at an example.

Example of an aggregation function

This example has three customer-related service interfaces that need to be combined into a single, aggregated interface. Figures 1 through 3 show the interfaces of each of the services, along with the corresponding business objects. Figure 4 shows you the aggregate interface and its business object. The project interchange file provided with this article contains this example as well as the projects for the aggregate primitives. Load this PI file into WebSphere Integration Developer if you want to follow this description in the actual implementation:

Figure 1. The Rolodex service
The Rolodex service
Figure 2. The Accounting service
The Accounting service
Figure 3. The Statistics service
The Statistics service
Figure 4. The Aggregate interface
The Aggregate interface

In this example, the Aggregate interface's Address, Account, and Age types are the same as the corresponding types in the Rolodex, Accounting, and Statistics services, respectively. But they don't have to be. At a minimum, you must wire transformation primitives around the aggregation primitives to change the namespaces of the messages. These transformations can also massage the data structures into whatever form is needed.

The Aggregate module is built as a WebSphere ESB mediation module. Figure 5 below shows the assembly diagram for this Aggregate module. At this level it's obvious how to assemble the components. You have an import for each of the back-end services, an export for this Aggregate module, and the mediation component that ties them together:

Figure 5. The Aggregate module assembly diagram
The Aggregate module assembly

Figures 6 and 7 show the request and response mediation flows for the mediation component:

Figure 6. The Aggregate module's request mediation flow
The Aggregate module's request mediation flow
Figure 7. The Aggregate module's response mediation flow
The Aggregate module's response mediation flow

In these two figures, look at the mediation primitive palette on the left. There are two new icons -- our new FanIn and FanOut primitives. As you can see in the figures, the request flow contains the FanOut, and the response flow contains the FanIn.

When you initially drop the FanOut primitive onto the workspace, it has no output terminals. Right-click and select Add Output Terminal for each fan-out that you will wire, and do likewise for input terminals on the FanIn primitive. On the FanOut primitive, as on most primitives, you need not worry about configuring the terminal types. Once you wire the input terminal, the output terminals all acquire the same interface as the input.

The FanIn primitive is a bit more complex. You can wire the output terminal, but the input terminals remain undefined. You must manually set their interfaces: right-click on each terminal (select the terminal box on the primitive, not the primitive itself), select Change Message Type as shown in Figure 8, and then describe the message in the pop-up window as shown in Figure 9:

Figure 8. Setting an input terminal's interface.
Set the interface
Figure 9. Describing an input terminal's interface.
Describe the interface

Wiring FanOut is straightforward. You must wire each FanOut output terminal to an XSL transformation primitive. Even if the data types being sent are identical, the namespaces will be different, as in our example. The data in all cases is a simple string. To inspect the actual transformations, see the project interchange file.

Wiring the FanIn is the reverse of the FanOut. You have multiple inputs instead of multiple outputs, and each transformation occurs before the FanIn. The extra trick here is that your transformation will fill in only a piece of the aggregation type -- the piece that's coming back from the given service. You have to tell the FanIn primitive, via its properties, which part of the aggregation type that you are filling in for each input so that it can collect it (see Figure 10 for a view of the properties tab for our example's FanIn primitive). The names of the input terminals used here are the default ones created when you add the terminals. Once the primitive collects all of the pieces, it sends the response back to the caller.

Figure 10. Configure the FanIn primitive.
Configure the FanIn primitive

Building your own aggregation primitives

This article does not describe how to build a custom mediation primitive plug-in. For details on how to do that, see Building your own mediation primitive with WebSphere Integration Developer and WebSphere Enterprise Service Bus.

A mediation primitive is made up of two parts: the plug-in project and the implementation project. In the project interchange file included with this article, the fan-out projects are called com.ibm.wesb.aggregation.fanout and FanOut, while the fan-in projects called com.ibm.wesb.aggregation.fanin and FanIn.

FanOut

The only thing special about the FanOut plug-in project is that it defines a dynamic set of output terminals. You define this feature very simply: you create a single output terminal category and you select dynamic for the value of the isDynamic field:

Figure 11. Define the FanOut's dynamic output terminals
Define the FanOut's dynamic outputterminals

Other than the dynamic output terminals, the FanOut plug-in project is a typical mediation primitive plug-in, except that it has no properties. The implementation for the FanOut primitive in shown in Listing 2 below. It is very simple -- it just places a unique ID onto the message's correlation context, and fires the message to each output terminal.

As described above, these aggregation primitives make use of the correlation context. The FanOut implementation has to do a little work in the method setUUIDOnMessage to determine whether the user is already using the correlation context, but the details are not relevant to this article, so they don't appear in Listing 2. If you want to see those details, open the project interchange file.

The importance of the mediate method is that it fires the message to all output terminals. It gets the list of output terminals from the mediation services (the method getMediationServices is a method on the base class, ESBMediationPrimitive). It then simply loops through this list, firing the message to each output terminal:

Listing 2. The implementation of FanOut's mediate function
public void mediate(InputTerminal inputTerminal, DataObject message)
        throws MediationConfigurationException, MediationBusinessException {
  setUUIDOnMessage(message);

  // Send the message to all of the output terminals
  List outputs = getMediationServices().getOutputTerminals();
  if (outputs != null) {
    for (int i = 0; i < outputs.size(); ++i) {
      OutputTerminal output = (OutputTerminal) outputs.get(i);
      if (output != null) {
        output.fire(message);
      }
    }
  }
}

FanIn

The FanIn primitive is somewhat more complex than the FanOut primitive. For Fanin, the input terminal is dynamic instead of the output terminal. But the special part of this primitive, from the plug-in point of view, is the property. Not only does this primitive have a property; it is a table property and not a simple property. Listing 3 shows what the property groups file looks like for a table property:

Listing 3. The fanin.xml property groups file
<pg:BasePropertyGroups
    name="CacheReaderPropertyGroups"
    resourceBundle="ESBMediationExamples"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:pg="http://www.ibm.com/propertygroup/6.0.1"> 
  <propertyGroup
      name="FanInGroup"
      xsi:type="pg:BasePropertyGroup" >
    <property name="collectors" displayName="Collectors" xsi:type="pg:TableProperty">
      <description>
        This table describes which portion of the aggregation fan-in message is relevant
        for each input terminal.
      </description>
      <qualifier preferredHeight="100" xsi:type="pg:TableLayoutQualifier">
        <column
            name="inputTerminal" 
            preferredWidth="100" 
            xsi:type="pg:TableColumnQualifier"/>
        <column
            name="relevantSubtype" 
            preferredWidth="250" 
            xsi:type="pg:TableColumnQualifier"/>
      </qualifier>
      <column
          name="inputTerminal" 
          required="true" 
          validValuesEditable="true" 
          id="com.ibm.propertygroup.ext.ui.TextProperty" 
          displayName="Input terminal" 
          propertyType="string" 
          xsi:type="pg:ConstraintSingleValuedProperty"/>
      <column
          name="relevantSubtype" 
          required="true" 
          validValuesEditable="true"
          displayName="Relevant subtype" 
          id="com.ibm.propertygroup.ext.ui.XPathProperty" 
          propertyType="string" 
          xsi:type="pg:ConstraintSingleValuedProperty">
        <qualifier 
            name="propertyType" 
            value="XPATH" 
            xsi:type="pg:GenericPropertyQualifier"/>		
      </column>
    </property>
  </propertyGroup> 
</pg:BasePropertyGroups>

Compare this property groups file to the corresponding properties screen shot in Figure 10 above. The table property is named collectors (the display name is Collectors) and has two columns: inputTerminal (display name Input terminal) and relevantSubtype (display name Relevant subtype). As shown in Figure 10, this XML file describes what the table will look like in the mediation flow editor's Properties panel. It also describes what this type should look like in Java. A table property maps to an array of a Java bean, which you have to write, and each column maps to a field in the bean. Listing 4 shows the Java bean for our collectors table property:

Listing 4. The implementation of FanIn's mediate function
public class Collector {
  private String inputTerminal;
  private String relevantSubtype;

  public String getInputTerminal() {
    return inputTerminal;
  }

  public void setInputTerminal(String inputTerminal) {
    this.inputTerminal = inputTerminal;
  }

  public String getRelevantSubtype() {
    return relevantSubtype;
  }

  public void setRelevantSubtype(String relevantSubtype) {
    this.relevantSubtype = relevantSubtype;
  }
}

The implementation for the FanOut primitive is shown in Listing 5 below. Each section of code is described in the accompanying comments. At a high level, this method:

  • Acquires each response message
  • Places the relevant part of the response message into the aggregate message
  • When all responses have been received, fires the output terminal with the aggregate response
Listing 5. The implementation of FanIn's mediate function
public void mediate(InputTerminal inputTerminal, DataObject message)
    throws MediationConfigurationException, MediationBusinessException {

  // Get the fan-out correlation ID from the response message.
  String UUID = message.getString("/context/correlation/fanOutID");

  // Get the aggregated response message for this correlation ID.
  Responses responses = (Responses) globalResponses.get(UUID);

  // Get the name of the input terminal the message came in on.
  String inputTerminalName = inputTerminal.getName();

  // Loop through the set of collector properties, looking for the one that matches
  // the input terminal name.
  for (int i = 0; i < collectors.length; ++i) {
    if (inputTerminalName.equals(collectors[i].getInputTerminal())) {
      if (responses == null) {

        // We don't have an aggregated response, yet (this is the first response).
        // Use the incoming message as the start of the aggregate message.
        responses = new Responses();
        responses.aggregatedResponse = message;
        responses.fullBits = (1 << collectors.length) - 1;
        globalResponses.put(UUID, responses);
      }
      else {

        // Fill in aggregate response with this input terminal's portion of the
        // aggregation type.
        String subTypePath = collectors[i].getRelevantSubtype();
        DataObject element = message.getDataObject(subTypePath);
        responses.aggregatedResponse.set(subTypePath, element);
      }

      // There is a collection bit dedicated to each collection input terminal:
      //   the ones position bit is for the first input terminal;
      //   the twos position bit is for the second;
      //   the fours position bit is for the third, etc.
      // Set the bit for this input terminal.
      responses.collectionBits |= 1 << i;
      break;
    }
  }

  // If we've collected all input responses and all of the collection bits are set,
  // fire the aggregate message through the output terminal.
  if (responses.collectionBits == responses.fullBits) {
    // Get the out terminal from the mediation services
    OutputTerminal outTerminal = getMediationServices().getOutputTerminal("output");

    if (outTerminal != null) {
      outTerminal.fire(responses.aggregatedResponse);
    }
  }
}

That's all you have to do to build your own ESB aggregation function. Of course, as described above, making this function production-ready requires additional work, and this article has tried to give you the basis for doing that work.

Conclusion

The term aggregation encompasses a wide set of functions. This article has shown you how to build one particular example of an aggregation function appropriate for an ESB: an aggregation that collates dissimilar responses. This function is implemented using two user-defined mediation primitive plug-ins: a fan-out primitive that must be placed on the request flow, and a fan-in primitive that must be placed on the response flow. This example provides a framework on which you can build your own aggregation function.


Download

DescriptionNameSize
Sample codeAggregation.PI.zip82 KB

Resources

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into WebSphere on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=WebSphere
ArticleID=248584
ArticleTitle=Building an aggregation function using WebSphere ESB
publish-date=08152007