Data quality
Configure the message flow nodes that are converted from the IBM App Connect Professional Data quality activities.
About this task
You converted your orchestration into a message flow. See Table 1, for information on which nodes are used for each of the activities.
Steps are split into sections and are intentionally brief to get you started quickly on configuring the nodes in your message flow. See linked topics in the table for more comprehensive information on configuring the nodes.
IBM App Connect Professional activity | IBM App Connect Enterprise node |
---|---|
Sort activity | and |
Merge activity | |
Lookup activity | and Compute node |
Filter and Profile activity |
Filter node and |
Mapping node for Sort activity
About this task
Procedure
JavaCompute node for Merge activity
About this task
Example
<Root>
<LeftEntries>
<entry x="1" y="10"/>
<entry x="2" y="20"/>
<entry x="2" y="21"/>
<entry x="3" y="30"/>
</LeftEntries>
<RightEntries>
<entry x="4" z="40"/>
<entry x="2" z="23"/>
<entry x="2" z="22"/>
<entry x="0" z="00"/>
</RightEntries>
</Root>
Merge and keep all duplicates for the provided input message:
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.ibm.broker.javacompute.MbJavaComputeNode;
import com.ibm.broker.plugin.MbElement;
import com.ibm.broker.plugin.MbException;
import com.ibm.broker.plugin.MbMessage;
import com.ibm.broker.plugin.MbMessageAssembly;
import com.ibm.broker.plugin.MbOutputTerminal;
import com.ibm.broker.plugin.MbUserException;
import com.ibm.broker.plugin.MbXMLNSC;
public class Example_JavaCompute extends MbJavaComputeNode {
public void evaluate(MbMessageAssembly inAssembly) throws MbException {
MbOutputTerminal out = getOutputTerminal("out");
MbMessage inMessage = inAssembly.getMessage();
MbMessage outMessage = new MbMessage();
MbMessageAssembly outAssembly = new MbMessageAssembly(inAssembly, outMessage);
try {
// Copy input headers to output message
MbElement inRoot = inMessage.getRootElement();
MbElement outRoot = outMessage.getRootElement();
MbElement header = inRoot.getFirstChild();
while (header != null && header.getNextSibling() != null) {
outRoot.addAsLastChild(header.copy());
header = header.getNextSibling();
}
// Create output message structure
MbElement outXMLNSC = outRoot.createElementAsLastChild(MbXMLNSC.PARSER_NAME);
MbElement mergedEntries = outXMLNSC.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "entries", null);
// Extract datasets
MbElement leftEntries = inMessage.getRootElement().getFirstElementByPath("XMLNSC/Root/LeftEntries");
MbElement rightEntries = inMessage.getRootElement().getFirstElementByPath("XMLNSC/Root/RightEntries");
// Sort left and right inputs independently
List<Map<String, String>> leftList = parseAndSortElements(leftEntries, "x", "ascending");
List<Map<String, String>> rightList = parseAndSortElements(rightEntries, "x", "ascending");
// Merge and keep all duplicates
mergeSortedLists(leftList, rightList, mergedEntries);
out.propagate(outAssembly);
} catch (Exception e) {
throw new MbUserException(this, "evaluate()", "", "", e.toString(), null);
}
}
private List<Map<String, String>> parseAndSortElements(MbElement parent, String key, String order) throws MbException {
List<Map<String, String>> elements = new ArrayList<>();
MbElement child = parent.getFirstChild();
while (child != null) {
Map<String, String> elementData = new HashMap<>();
MbElement attribute = child.getFirstChild();
while (attribute != null) {
elementData.put(attribute.getName(), attribute.getValueAsString());
attribute = attribute.getNextSibling();
}
elements.add(elementData);
child = child.getNextSibling();
}
// Sort based on the specified key and order
Comparator<Map<String, String>> comparator = Comparator.comparingInt(o -> Integer.parseInt(o.get(key)));
if ("descending".equalsIgnoreCase(order)) {
comparator = comparator.reversed();
}
elements.sort(comparator);
return elements;
}
private void mergeSortedLists(
List<Map<String, String>> leftList,
List<Map<String, String>> rightList,
MbElement mergedEntries
) throws MbException {
int leftIndex = 0, rightIndex = 0;
// Merge while retaining all duplicates
while (leftIndex < leftList.size() && rightIndex < rightList.size()) {
Map<String, String> left = leftList.get(leftIndex);
Map<String, String> right = rightList.get(rightIndex);
int compare = Integer.compare(
Integer.parseInt(left.get("x")),
Integer.parseInt(right.get("x"))
);
if (compare < 0) {
createEntry(mergedEntries, left);
leftIndex++;
} else if (compare > 0) {
createEntry(mergedEntries, right);
rightIndex++;
} else {
createEntry(mergedEntries, left);
createEntry(mergedEntries, right);
leftIndex++;
rightIndex++;
}
}
// Add remaining entries from the left list
while (leftIndex < leftList.size()) {
createEntry(mergedEntries, leftList.get(leftIndex));
leftIndex++;
}
// Add remaining entries from the right list
while (rightIndex < rightList.size()) {
createEntry(mergedEntries, rightList.get(rightIndex));
rightIndex++;
}
}
private void createEntry(MbElement parent, Map<String, String> elementData) throws MbException {
MbElement entry = parent.createElementAsLastChild(MbElement.TYPE_NAME);
entry.setName("entry");
for (Map.Entry<String, String> field : elementData.entrySet()) {
entry.createElementAsLastChild(MbXMLNSC.ATTRIBUTE, field.getKey(), field.getValue());
}
}
}
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.ibm.broker.javacompute.MbJavaComputeNode;
import com.ibm.broker.plugin.MbElement;
import com.ibm.broker.plugin.MbException;
import com.ibm.broker.plugin.MbMessage;
import com.ibm.broker.plugin.MbMessageAssembly;
import com.ibm.broker.plugin.MbOutputTerminal;
import com.ibm.broker.plugin.MbUserException;
import com.ibm.broker.plugin.MbXMLNSC;
public class MeregandKeepleft_JavaCompute extends MbJavaComputeNode {
@Override
public void evaluate(MbMessageAssembly inAssembly) throws MbException {
MbOutputTerminal out = getOutputTerminal("out");
MbMessage inMessage = inAssembly.getMessage();
MbMessage outMessage = new MbMessage();
MbMessageAssembly outAssembly = new MbMessageAssembly(inAssembly, outMessage);
// Copy input headers to output message
MbElement inRoot = inMessage.getRootElement();
MbElement outRoot = outMessage.getRootElement();
MbElement header = inRoot.getFirstChild();
while (header != null && header.getNextSibling() != null) {
outRoot.addAsLastChild(header.copy());
header = header.getNextSibling();
}
// Create output message structure
MbElement outXMLNSC = outRoot.createElementAsLastChild(MbXMLNSC.PARSER_NAME);
MbElement outXMLNSC1 = outRoot.createElementAsLastChild(MbXMLNSC.PARSER_NAME);
MbElement mergedEntries = outXMLNSC.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "entries", null);
MbElement remainderXml = outXMLNSC1.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "RemainderXML", null);
try {
MbElement leftEntries = inMessage.getRootElement().getFirstElementByPath("XMLNSC/Root/LeftEntries");
MbElement rightEntries = inMessage.getRootElement().getFirstElementByPath("XMLNSC/Root/RightEntries");
if (leftEntries == null || rightEntries == null) {
throw new MbUserException(this, "evaluate()", "", "", "Missing LeftEntries or RightEntries in input XML", null);
}
List<Map<String, String>> leftList = parseAndSortElements(leftEntries, "x", "ascending");
List<Map<String, String>> rightList = parseAndSortElements(rightEntries, "x", "ascending");
mergeAndKeepLeftDuplicates(leftList, rightList, mergedEntries, remainderXml);
out.propagate(outAssembly);
} catch (Exception e) {
e.printStackTrace();
throw new MbUserException(this, "evaluate()", "", "", "Error during processing: " + e.getMessage(), null);
}
}
private List<Map<String, String>> parseAndSortElements(MbElement parent, String key, String order) throws MbException {
List<Map<String, String>> elements = new ArrayList<>();
MbElement child = parent.getFirstChild();
while (child != null) {
Map<String, String> elementData = new HashMap<>();
MbElement attribute = child.getFirstChild();
while (attribute != null) {
elementData.put(attribute.getName(), attribute.getValueAsString());
attribute = attribute.getNextSibling();
}
elements.add(elementData);
child = child.getNextSibling();
}
// Check for datatype and sort accordingly
Comparator<Map<String, String>> comparator;
try {
// Assume numeric sorting if the key is parseable as an integer
comparator = Comparator.comparingInt(o -> Integer.parseInt(o.get(key)));
} catch (NumberFormatException e) {
// Fallback to string-based sorting
comparator = Comparator.comparing(o -> o.get(key));
}
// Handle sort order
if ("descending".equalsIgnoreCase(order)) {
comparator = comparator.reversed();
}
// Perform sorting
elements.sort(comparator);
return elements;
}
private void mergeAndKeepLeftDuplicates(
List<Map<String, String>> leftList,
List<Map<String, String>> rightList,
MbElement mergedEntries,
MbElement remainderXml
) throws MbException {
List<Map<String, String>> mergedList = new ArrayList<>();
List<Map<String, String>> remainderList = new ArrayList<>();
Set<String> usedKeys = new HashSet<>();
// Add all left entries to the merged list
for (Map<String, String> left : leftList) {
mergedList.add(left);
usedKeys.add(left.get("x"));
}
// Add unique right entries to the merged list, duplicates to remainderList
for (Map<String, String> right : rightList) {
if (!usedKeys.contains(right.get("x"))) {
mergedList.add(right);
usedKeys.add(right.get("x"));
} else {
remainderList.add(right);
}
}
// Sort the merged list by key 'x' in ascending order
mergedList.sort(Comparator.comparingInt(o -> Integer.parseInt(o.get("x"))));
// Create merged entries in the output
for (Map<String, String> entry : mergedList) {
createEntry(mergedEntries, entry);
}
for (Map<String, String> entry : remainderList) {
createEntry(remainderXml, entry);
}
}
private void createEntry(MbElement parent, Map<String, String> elementData) throws MbException {
MbElement entry = parent.createElementAsLastChild(MbElement.TYPE_NAME);
entry.setName("entry");
for (Map.Entry<String, String> field : elementData.entrySet()) {
entry.createElementAsLastChild(MbXMLNSC.ATTRIBUTE, field.getKey(), field.getValue());
}
}
}
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.ibm.broker.javacompute.MbJavaComputeNode;
import com.ibm.broker.plugin.MbElement;
import com.ibm.broker.plugin.MbException;
import com.ibm.broker.plugin.MbMessage;
import com.ibm.broker.plugin.MbMessageAssembly;
import com.ibm.broker.plugin.MbOutputTerminal;
import com.ibm.broker.plugin.MbUserException;
import com.ibm.broker.plugin.MbXMLNSC;
public class MergeAndRemoveduplicates_JavaCompute extends MbJavaComputeNode {
public void evaluate(MbMessageAssembly inAssembly) throws MbException {
MbOutputTerminal out = getOutputTerminal("out");
MbMessage inMessage = inAssembly.getMessage();
MbMessage outMessage = new MbMessage();
MbMessageAssembly outAssembly = new MbMessageAssembly(inAssembly, outMessage);
// Copy input headers to output message
MbElement inRoot = inMessage.getRootElement();
MbElement outRoot = outMessage.getRootElement();
MbElement header = inRoot.getFirstChild();
while (header != null && header.getNextSibling() != null) {
outRoot.addAsLastChild(header.copy());
header = header.getNextSibling();
}
// Create output message structure
MbElement outXMLNSC = outRoot.createElementAsLastChild(MbXMLNSC.PARSER_NAME);
MbElement outXMLNSC1 = outRoot.createElementAsLastChild(MbXMLNSC.PARSER_NAME);
MbElement mergedEntries = outXMLNSC.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "entries", null);
MbElement remainderXml = outXMLNSC1.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "RemainderXML", null);
try {
// Extract input XML
MbElement leftEntries = inMessage.getRootElement().getFirstElementByPath("XMLNSC/Root/LeftEntries");
MbElement rightEntries = inMessage.getRootElement().getFirstElementByPath("XMLNSC/Root/RightEntries");
if (leftEntries == null || rightEntries == null) {
throw new MbUserException(this, "evaluate()", "", "", "Missing LeftEntries or RightEntries in input XML", null);
}
// Parse and sort both inputs in ascending order
List<Map<String, String>> leftList = parseAndSortElements(leftEntries, "x", "ascending");
List<Map<String, String>> rightList = parseAndSortElements(rightEntries, "x", "ascending");
// Perform merge while removing duplicates
mergeAndRemoveDuplicates(leftList, rightList, mergedEntries, remainderXml);
// Propagate the message
out.propagate(outAssembly);
} catch (Exception e) {
e.printStackTrace();
throw new MbUserException(this, "evaluate()", "", "", "Error during processing: " + e.getMessage(), null);
}
}
private List<Map<String, String>> parseAndSortElements(MbElement parent, String key, String order) throws MbException {
List<Map<String, String>> elements = new ArrayList<>();
MbElement child = parent.getFirstChild();
while (child != null) {
Map<String, String> elementData = new HashMap<>();
MbElement attribute = child.getFirstChild();
while (attribute != null) {
elementData.put(attribute.getName(), attribute.getValueAsString());
attribute = attribute.getNextSibling();
}
elements.add(elementData);
child = child.getNextSibling();
}
// Sort based on the specified key and order
Comparator<Map<String, String>> comparator = Comparator.comparingInt(o -> Integer.parseInt(o.get(key)));
if ("descending".equalsIgnoreCase(order)) {
comparator = comparator.reversed();
}
elements.sort(comparator);
return elements;
}
private void mergeAndRemoveDuplicates(
List<Map<String, String>> leftList,
List<Map<String, String>> rightList,
MbElement mergedEntries,
MbElement remainderXml
) throws MbException {
List<Map<String, String>> mergedList = new ArrayList<>();
List<Map<String, String>> remainderList = new ArrayList<>();
Map<String, Integer> keyCounts = new HashMap<>();
// Step 1: Count occurrences of 'x' values
for (Map<String, String> entry : leftList) {
keyCounts.compute(entry.get("x"), (k, v) -> (v == null) ? 1 : v + 1);
}
for (Map<String, String> entry : rightList) {
keyCounts.compute(entry.get("x"), (k, v) -> (v == null) ? 1 : v + 1);
}
// Step 2: Classify entries in one loop
for (Map<String, String> entry : leftList) {
(keyCounts.get(entry.get("x")) > 1 ? remainderList : mergedList).add(entry);
}
for (Map<String, String> entry : rightList) {
(keyCounts.get(entry.get("x")) > 1 ? remainderList : mergedList).add(entry);
}
// Step 3: Sort merged entries by 'x'
mergedList.sort(Comparator.comparingInt(o -> Integer.parseInt(o.get("x"))));
// Step 4: Create output XML entries
for (Map<String, String> entry : mergedList) {
createEntry(mergedEntries, entry);
}
for (Map<String, String> entry : remainderList) {
createEntry(remainderXml, entry);
}
}
private void createEntry(MbElement parent, Map<String, String> elementData) throws MbException {
MbElement entry = parent.createElementAsLastChild(MbElement.TYPE_NAME);
entry.setName("entry");
for (Map.Entry<String, String> field : elementData.entrySet()) {
entry.createElementAsLastChild(MbXMLNSC.ATTRIBUTE, field.getKey(), field.getValue());
}
}
}
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.ibm.broker.javacompute.MbJavaComputeNode;
import com.ibm.broker.plugin.MbElement;
import com.ibm.broker.plugin.MbException;
import com.ibm.broker.plugin.MbMessage;
import com.ibm.broker.plugin.MbMessageAssembly;
import com.ibm.broker.plugin.MbOutputTerminal;
import com.ibm.broker.plugin.MbUserException;
import com.ibm.broker.plugin.MbXMLNSC;
public class InnerJoin_JavaCompute extends MbJavaComputeNode {
@Override
public void evaluate(MbMessageAssembly inAssembly) throws MbException {
MbOutputTerminal out = getOutputTerminal("out");
MbMessage inMessage = inAssembly.getMessage();
MbMessage outMessage = new MbMessage();
MbMessageAssembly outAssembly = new MbMessageAssembly(inAssembly, outMessage);
// Copy input headers
MbElement inRoot = inMessage.getRootElement();
MbElement outRoot = outMessage.getRootElement();
MbElement header = inRoot.getFirstChild();
while (header != null && header.getNextSibling() != null) {
outRoot.addAsLastChild(header.copy());
header = header.getNextSibling();
}
// Create output message structure
MbElement outXMLNSC = outRoot.createElementAsLastChild(MbXMLNSC.PARSER_NAME);
MbElement outXMLNSC1 = outRoot.createElementAsLastChild(MbXMLNSC.PARSER_NAME);
MbElement joinedEntries = outXMLNSC.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "entries", null);
MbElement remainderXml = outXMLNSC1.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "RemainderXML", null);
try {
// Extract input XML
MbElement leftEntries = inMessage.getRootElement().getFirstElementByPath("XMLNSC/Root/LeftEntries");
MbElement rightEntries = inMessage.getRootElement().getFirstElementByPath("XMLNSC/Root/RightEntries");
if (leftEntries == null || rightEntries == null) {
throw new MbUserException(this, "evaluate()", "", "", "Missing LeftEntries or RightEntries in input XML", null);
}
// Parse and sort both inputs in ascending order
List<Map<String, String>> leftList = parseAndSortElements(leftEntries, "x", "ascending");
List<Map<String, String>> rightList = parseAndSortElements(rightEntries, "x", "ascending");
// Perform Inner Join and capture remainders
innerJoinWithRemainder(leftList, rightList, joinedEntries, remainderXml);
// Propagate the message
out.propagate(outAssembly);
} catch (Exception e) {
e.printStackTrace();
throw new MbUserException(this, "evaluate()", "", "", "Error during processing: " + e.getMessage(), null);
}
}
private List<Map<String, String>> parseAndSortElements(MbElement parent, String key, String order) throws MbException {
List<Map<String, String>> elements = new ArrayList<>();
MbElement child = parent.getFirstChild();
while (child != null) {
Map<String, String> elementData = new HashMap<>();
MbElement attribute = child.getFirstChild();
while (attribute != null) {
elementData.put(attribute.getName(), attribute.getValueAsString());
attribute = attribute.getNextSibling();
}
elements.add(elementData);
child = child.getNextSibling();
}
// Sort based on the specified key and order
Comparator<Map<String, String>> comparator = Comparator.comparingInt(o -> Integer.parseInt(o.get(key)));
if ("descending".equalsIgnoreCase(order)) {
comparator = comparator.reversed();
}
elements.sort(comparator);
return elements;
}
private void innerJoinWithRemainder(
List<Map<String, String>> leftList,
List<Map<String, String>> rightList,
MbElement joinedEntries,
MbElement remainderXml
) throws MbException {
Map<String, List<Map<String, String>>> rightMap = new HashMap<>();
Set<String> matchedKeys = new HashSet<>();
// Group Right Entries by 'x'
for (Map<String, String> right : rightList) {
rightMap.computeIfAbsent(right.get("x"), k -> new ArrayList<>()).add(right);
}
// Perform Inner Join
for (Map<String, String> left : leftList) {
String leftKey = left.get("x");
if (rightMap.containsKey(leftKey)) {
matchedKeys.add(leftKey); // Track matched keys
for (Map<String, String> right : rightMap.get(leftKey)) {
Map<String, String> joinedEntry = new HashMap<>(left);
joinedEntry.putAll(right);
createEntry(joinedEntries, joinedEntry);
}
}
}
// Add non-matching LeftEntries to RemainderXML
for (Map<String, String> left : leftList) {
if (!matchedKeys.contains(left.get("x"))) {
createEntry(remainderXml, left);
}
}
// Add non-matching RightEntries to RemainderXML
for (Map<String, String> right : rightList) {
if (!matchedKeys.contains(right.get("x"))) {
createEntry(remainderXml, right);
}
}
}
private void createEntry(MbElement parent, Map<String, String> elementData) throws MbException {
MbElement entry = parent.createElementAsLastChild(MbElement.TYPE_NAME);
entry.setName("entry");
for (Map.Entry<String, String> field : elementData.entrySet()) {
entry.createElementAsLastChild(MbXMLNSC.ATTRIBUTE, field.getKey(), field.getValue());
}
}
}
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.ibm.broker.javacompute.MbJavaComputeNode;
import com.ibm.broker.plugin.MbElement;
import com.ibm.broker.plugin.MbException;
import com.ibm.broker.plugin.MbMessage;
import com.ibm.broker.plugin.MbMessageAssembly;
import com.ibm.broker.plugin.MbOutputTerminal;
import com.ibm.broker.plugin.MbUserException;
import com.ibm.broker.plugin.MbXMLNSC;
public class Leftouterjoin_JavaCompute extends MbJavaComputeNode {
public void evaluate(MbMessageAssembly inAssembly) throws MbException {
MbOutputTerminal out = getOutputTerminal("out");
MbMessage inMessage = inAssembly.getMessage();
MbMessage outMessage = new MbMessage();
MbMessageAssembly outAssembly = new MbMessageAssembly(inAssembly, outMessage);
// Copy input headers
MbElement inRoot = inMessage.getRootElement();
MbElement outRoot = outMessage.getRootElement();
MbElement header = inRoot.getFirstChild();
while (header != null && header.getNextSibling() != null) {
outRoot.addAsLastChild(header.copy());
header = header.getNextSibling();
}
// Create output message structure
MbElement outXMLNSC = outRoot.createElementAsLastChild(MbXMLNSC.PARSER_NAME);
MbElement outXMLNSC1 = outRoot.createElementAsLastChild(MbXMLNSC.PARSER_NAME);
MbElement joinedEntries = outXMLNSC.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "entries", null);
MbElement remainderXml = outXMLNSC1.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "RemainderXML", null);
try {
// Extract input XML
MbElement leftEntries = inMessage.getRootElement().getFirstElementByPath("XMLNSC/Root/LeftEntries");
MbElement rightEntries = inMessage.getRootElement().getFirstElementByPath("XMLNSC/Root/RightEntries");
if (leftEntries == null || rightEntries == null) {
throw new MbUserException(this, "evaluate()", "", "", "Missing LeftEntries or RightEntries in input XML", null);
}
// Parse and sort both inputs in ascending order
List<Map<String, String>> leftList = parseAndSortElements(leftEntries, "x", "ascending");
List<Map<String, String>> rightList = parseAndSortElements(rightEntries, "x", "ascending");
// Perform Left Outer Join and capture remainders
leftOuterJoinWithRemainder(leftList, rightList, joinedEntries, remainderXml);
// Propagate the message
out.propagate(outAssembly);
} catch (Exception e) {
e.printStackTrace();
throw new MbUserException(this, "evaluate()", "", "", "Error during processing: " + e.getMessage(), null);
}
}
private List<Map<String, String>> parseAndSortElements(MbElement parent, String key, String order) throws MbException {
List<Map<String, String>> elements = new ArrayList<>();
MbElement child = parent.getFirstChild();
while (child != null) {
Map<String, String> elementData = new HashMap<>();
MbElement attribute = child.getFirstChild();
while (attribute != null) {
elementData.put(attribute.getName(), attribute.getValueAsString());
attribute = attribute.getNextSibling();
}
elements.add(elementData);
child = child.getNextSibling();
}
// Sort based on the specified key and order
Comparator<Map<String, String>> comparator = Comparator.comparingInt(o -> Integer.parseInt(o.get(key)));
if ("descending".equalsIgnoreCase(order)) {
comparator = comparator.reversed();
}
elements.sort(comparator);
return elements;
}
private void leftOuterJoinWithRemainder(
List<Map<String, String>> leftList,
List<Map<String, String>> rightList,
MbElement joinedEntries,
MbElement remainderXml
) throws MbException {
Map<String, List<Map<String, String>>> rightMap = new HashMap<>();
Set<String> matchedKeys = new HashSet<>();
// Group Right Entries by 'x'
for (Map<String, String> right : rightList) {
rightMap.computeIfAbsent(right.get("x"), k -> new ArrayList<>()).add(right);
}
// Perform Left Outer Join
for (Map<String, String> left : leftList) {
String leftKey = left.get("x");
if (rightMap.containsKey(leftKey)) {
matchedKeys.add(leftKey); // Track matched keys
for (Map<String, String> right : rightMap.get(leftKey)) {
Map<String, String> joinedEntry = new HashMap<>(left);
joinedEntry.putAll(right);
createEntry(joinedEntries, joinedEntry);
}
} else {
createEntry(joinedEntries, left); // Add left entry without matching right entry
}
}
// Add non-matching RightEntries to RemainderXML
for (Map<String, String> right : rightList) {
if (!matchedKeys.contains(right.get("x"))) {
createEntry(remainderXml, right);
}
}
}
private void createEntry(MbElement parent, Map<String, String> elementData) throws MbException {
MbElement entry = parent.createElementAsLastChild(MbElement.TYPE_NAME);
entry.setName("entry");
for (Map.Entry<String, String> field : elementData.entrySet()) {
entry.createElementAsLastChild(MbXMLNSC.ATTRIBUTE, field.getKey(), field.getValue());
}
}
}
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.ibm.broker.javacompute.MbJavaComputeNode;
import com.ibm.broker.plugin.MbElement;
import com.ibm.broker.plugin.MbException;
import com.ibm.broker.plugin.MbMessage;
import com.ibm.broker.plugin.MbMessageAssembly;
import com.ibm.broker.plugin.MbOutputTerminal;
import com.ibm.broker.plugin.MbUserException;
import com.ibm.broker.plugin.MbXMLNSC;
public class FullouterJoin_JavaCompute extends MbJavaComputeNode {
public void evaluate(MbMessageAssembly inAssembly) throws MbException {
MbOutputTerminal out = getOutputTerminal("out");
MbMessage inMessage = inAssembly.getMessage();
MbMessage outMessage = new MbMessage();
MbMessageAssembly outAssembly = new MbMessageAssembly(inAssembly, outMessage);
// Copy input headers to output message
MbElement inRoot = inMessage.getRootElement();
MbElement outRoot = outMessage.getRootElement();
MbElement header = inRoot.getFirstChild();
while (header != null && header.getNextSibling() != null) {
outRoot.addAsLastChild(header.copy());
header = header.getNextSibling();
}
// Create output message structure
MbElement outXMLNSC = outRoot.createElementAsLastChild(MbXMLNSC.PARSER_NAME);
MbElement mergedEntries = outXMLNSC.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "entries", null);
try {
// Extract input XML
MbElement leftEntries = inMessage.getRootElement().getFirstElementByPath("XMLNSC/Root/LeftEntries");
MbElement rightEntries = inMessage.getRootElement().getFirstElementByPath("XMLNSC/Root/RightEntries");
if (leftEntries == null || rightEntries == null) {
throw new MbUserException(this, "evaluate()", "", "", "Missing LeftEntries or RightEntries in input XML", null);
}
// Parse and sort both inputs in ascending order
List<Map<String, String>> leftList = parseAndSortElements(leftEntries, "x", "ascending");
List<Map<String, String>> rightList = parseAndSortElements(rightEntries, "x", "ascending");
// Perform full outer join
fullOuterJoin(leftList, rightList, mergedEntries);
// Propagate the message
out.propagate(outAssembly);
} catch (Exception e) {
e.printStackTrace();
throw new MbUserException(this, "evaluate()", "", "", "Error during processing: " + e.getMessage(), null);
}
}
private List<Map<String, String>> parseAndSortElements(MbElement parent, String key, String order) throws MbException {
List<Map<String, String>> elements = new ArrayList<>();
MbElement child = parent.getFirstChild();
while (child != null) {
Map<String, String> elementData = new HashMap<>();
MbElement attribute = child.getFirstChild();
while (attribute != null) {
elementData.put(attribute.getName(), attribute.getValueAsString());
attribute = attribute.getNextSibling();
}
elements.add(elementData);
child = child.getNextSibling();
}
// Sort based on the specified key and order
Comparator<Map<String, String>> comparator = Comparator.comparingInt(o -> Integer.parseInt(o.get(key)));
if ("descending".equalsIgnoreCase(order)) {
comparator = comparator.reversed();
}
elements.sort(comparator);
return elements;
}
private void fullOuterJoin(
List<Map<String, String>> leftList,
List<Map<String, String>> rightList,
MbElement mergedEntries
) throws MbException {
Map<String, List<Map<String, String>>> leftMap = new HashMap<>();
Map<String, List<Map<String, String>>> rightMap = new HashMap<>();
// Organize leftList by 'x' values
for (Map<String, String> left : leftList) {
leftMap.computeIfAbsent(left.get("x"), k -> new ArrayList<>()).add(left);
}
// Organize rightList by 'x' values
for (Map<String, String> right : rightList) {
rightMap.computeIfAbsent(right.get("x"), k -> new ArrayList<>()).add(right);
}
// Get all unique 'x' values
Set<String> allKeys = new HashSet<>();
allKeys.addAll(leftMap.keySet());
allKeys.addAll(rightMap.keySet());
// Process the full outer join
for (String key : allKeys) {
List<Map<String, String>> leftEntries = leftMap.getOrDefault(key, new ArrayList<>());
List<Map<String, String>> rightEntries = rightMap.getOrDefault(key, new ArrayList<>());
if (!leftEntries.isEmpty() && !rightEntries.isEmpty()) {
// Inner join (when 'x' is present in both lists)
for (Map<String, String> left : leftEntries) {
for (Map<String, String> right : rightEntries) {
Map<String, String> merged = new HashMap<>(left);
merged.putAll(right);
createEntry(mergedEntries, merged);
}
}
} else if (!leftEntries.isEmpty()) {
// Left-only entries (no match in right)
for (Map<String, String> left : leftEntries) {
createEntry(mergedEntries, left);
}
} else {
// Right-only entries (no match in left)
for (Map<String, String> right : rightEntries) {
createEntry(mergedEntries, right);
}
}
}
}
private void createEntry(MbElement parent, Map<String, String> elementData) throws MbException {
MbElement entry = parent.createElementAsLastChild(MbElement.TYPE_NAME);
entry.setName("entry");
for (Map.Entry<String, String> field : elementData.entrySet()) {
entry.createElementAsLastChild(MbXMLNSC.ATTRIBUTE, field.getKey(), field.getValue());
}
}
}
Compute and Java Compute nodes for Lookup activity
About this task
To establish a successful connection to a database, see the Database activity topic.
- JavaCompute node is used for custom or complex lookup logic.
- Compute node is used for simple lookups, using the ESQL code.
Retrieving data from the previous node and using it in the next node
import com.ibm.broker.javacompute.MbJavaComputeNode;
import com.ibm.broker.plugin.*;
import java.sql.*;
public class LookupMsgFlow_JavaCompute extends MbJavaComputeNode {
public void evaluate(MbMessageAssembly assembly) throws MbException {
Connection connection = null;
MbOutputTerminal out = getOutputTerminal("out");
//connection = createIIBJDBCConnection();
// Access the message
try {
connection = createIIBJDBCConnection();
MbMessage inMessage = assembly.getMessage();
// Access the root element of the message tree
MbElement rootElement = inMessage.getRootElement();
// Attempt to access JSON Data
MbElement jsonDataElement = rootElement.getFirstElementByPath("JSON/Data");
System.out.println("jsonDataElement:" +jsonDataElement);
if (jsonDataElement != null) {
MbElement empIDElement = jsonDataElement.getFirstElementByPath("EmpID");
System.out.println("empIDElement:" +empIDElement);
if (empIDElement != null) {
int empID =((Integer) empIDElement.getValue()).intValue();
System.out.println("EmpID: " + empID);
// Perform the lookup
String empName = performLookup(empID,connection);
System.out.println("empName: " + empName);
// Create a new output message
MbMessage outMessage = new MbMessage(inMessage);
MbElement outRoot = outMessage.getRootElement();
outRoot.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "EmpName", empName);
System.out.println("EmpName: " + empName);
} else {
System.out.println("EmpID or EmpName element is missing.");
}
} else {
System.out.println("JSON Data element is null. Check the path.");
}
} finally {
out.propagate(assembly);
}
}
private String performLookup(int empID,Connection connection) throws MbException {
String empName = null;
PreparedStatement statement = null;
ResultSet resultSet = null;
try {
// Prepare and execute the SQL query
String sql = "SELECT EmpName FROM dbo.Employee WHERE EmpID = ?";
statement = connection.prepareStatement(sql);
statement.setLong(1, empID);
resultSet = statement.executeQuery();
// Process the result
if (resultSet.next()) {
empName = resultSet.getString("EmpName");
}
} catch (SQLException e) {
// Log the exception and throw a RuntimeException or handle accordingly
System.err.println("SQL error occurred during lookup: " + e.getMessage());
throw new RuntimeException("Error during database lookup", e);
}
return empName;
}
public Connection createIIBJDBCConnection() throws MbException {
return getJDBCType4Connection("{MSSQL}:MSSQLPolicy", JDBC_TransactionType.MB_TRANSACTION_AUTO)
}
}
CREATE PROCEDURE ComputeLookup()
BEGIN
-- Declare variables
DECLARE EmpID INTEGER;
DECLARE EmpName CHARACTER;
DECLARE Flag CHARACTER;
DECLARE SQLState CHARACTER;
DECLARE SQLCode INTEGER;
DECLARE ErrorText CHARACTER;
-- Read EmpID from Input JSON
SET EmpID = InputRoot.JSON.Data.EmpID;
-- Check if EmpID is not NULL
IF EmpID IS NOT NULL THEN
-- Query the database
BEGIN
DECLARE EmpCursor CURSOR FOR
SELECT EmpName, Flag FROM dbo.Employee WHERE EmpID = EmpID;
OPEN EmpCursor;
FETCH EmpCursor INTO EmpName, Flag;
-- Check if data exists
IF SQLCODE = 0 THEN
-- Add the result to the Output message
SET OutputRoot.JSON.Data.EmpID = EmpID;
SET OutputRoot.JSON.Data.EmpName = EmpName;
SET OutputRoot.JSON.Data.Flag = Flag;
ELSE
-- Handle the case when no matching record is found
SET OutputRoot.JSON.Data.Error = 'Employee not found';
END IF;
CLOSE EmpCursor;
END;
-- Handle SQL exceptions
IF SQLSTATE <> '00000' THEN
SET ErrorText = 'Database error: SQLState=' || SQLSTATE || ', SQLCode=' || SQLCODE;
THROW USER EXCEPTION VALUES(SQLSTATE, ErrorText);
END IF;
ELSE
-- Handle missing EmpID
SET OutputRoot.JSON.Data.Error = 'EmpID is missing in the input';
END IF;
END;
In IBM App Connect Professional a JDBC data source is used for database connectivity. In IBM App Connect Enterprise, an ODBC data source is used.
- Use the MbMessage or MbElement objects to retrieve data from the incoming message tree (InputRoot).
- If the message is in XML, JSON, or another format, parse it appropriately.
- Use a JDBC connection to query the database.
- Use the key from the previous node as a parameter in the SQL query.
- Add the retrieved value to the OutputRoot.
Example 1:
<Root>
<Employee>
<EmpID>101</EmpID>
</Employee>
</Root>
import com.ibm.broker.plugin.*;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class EmployeeLookupComputeNode extends MbJavaComputeNode {
@Override
public void evaluate(MbMessageAssembly inAssembly) throws MbException {
MbOutputTerminal out = getOutputTerminal("out");
MbMessage inMessage = inAssembly.getMessage();
MbMessage outMessage = new MbMessage();
MbMessageAssembly outAssembly = new MbMessageAssembly(inAssembly, outMessage);
Connection connection = null;
PreparedStatement statement = null;
ResultSet resultSet = null;
try {
// Copy the input message to the output message
outMessage = new MbMessage(inMessage);
// Extract the EmpID from the input XML
MbElement inputRoot = inMessage.getRootElement();
MbElement empIdElement = inputRoot.getFirstElementByPath("/XMLNSC/Root/Employee/EmpID");
String empId = empIdElement != null ? empIdElement.getValueAsString() : null;
// Default values for EmpName and Flag
String empName = "NotFound";
String flag = "N/A";
if (empId != null) {
// Get a JDBC connection using the specified policy
connection = getJDBCType4Connection("JDBC_POLICY_NAME");
// Prepare the SQL query
String query = "SELECT EmpName, Flag FROM dbo.Employee WHERE EmpID = ?";
statement = connection.prepareStatement(query);
statement.setString(1, empId);
// Execute the query
resultSet = statement.executeQuery();
if (resultSet.next()) {
// Fetch the results from the query
empName = resultSet.getString("EmpName");
flag = resultSet.getString("Flag");
}
}
// Construct the output XML
MbElement outputRoot = outMessage.getRootElement().createElementAsLastChild(MbXMLNSC.PARSER_NAME);
MbElement employeeElement = outputRoot.createElementAsLastChild("Employee");
employeeElement.createElementAsLastChild("EmpID").setValue(empId);
employeeElement.createElementAsLastChild("EmpName").setValue(empName);
employeeElement.createElementAsLastChild("Flag").setValue(flag);
} catch (Exception e) {
throw new MbUserException(this, "evaluate()", "", "", e.toString(), null);
} finally {
// Close database resources
try {
if (resultSet != null) resultSet.close();
if (statement != null) statement.close();
if (connection != null) connection.close();
} catch (Exception e) {
getLogger().warning("Error closing resources: " + e.toString());
}
}
// Propagate the message to the next node
out.propagate(outAssembly);
}
}
Example 2
{
"EmpID":17
}
package com.ibm.lookup.example;
import com.ibm.broker.javacompute.MbJavaComputeNode;
import com.ibm.broker.plugin.*;
import java.sql.*;
public class LookupJavaPrevious_JavaCompute extends MbJavaComputeNode {
@Override
public void evaluate(MbMessageAssembly assembly) throws MbException {
Connection connection = null;
MbOutputTerminal out = getOutputTerminal("out");
try {
connection = createIIBJDBCConnection();
MbMessage inMessage = assembly.getMessage();
// Access the root element of the message tree
MbElement rootElement = inMessage.getRootElement();
// Attempt to access JSON Data
MbElement jsonDataElement = rootElement.getFirstElementByPath("JSON/Data");
System.out.println("jsonDataElement: " + jsonDataElement);
if (jsonDataElement != null) {
MbElement empIDElement = jsonDataElement.getFirstElementByPath("EmpID");
System.out.println("empIDElement: " + empIDElement);
if (empIDElement != null) {
int empID = ((Integer) empIDElement.getValue()).intValue();
System.out.println("EmpID: " + empID);
// Perform the lookup
EmployeeData employeeData = performLookup(empID, connection);
if (employeeData != null) {
System.out.println("EmpName: " + employeeData.getEmpName());
System.out.println("Flag: " + employeeData.getFlag());
// Create a new output message
MbMessage outMessage = new MbMessage(inMessage);
MbElement outRoot = outMessage.getRootElement();
outRoot.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "EmpName", employeeData.getEmpName());
outRoot.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Flag", employeeData.getFlag());
// Propagate the message
MbMessageAssembly outAssembly = new MbMessageAssembly(assembly, outMessage);
out.propagate(outAssembly);
} else {
System.out.println("No data found for EmpID: " + empID);
}
} else {
System.out.println("EmpID element is missing.");
}
} else {
System.out.println("JSON Data element is null. Check the path.");
}
} finally {
}
}
private EmployeeData performLookup(int empID, Connection connection) throws MbException {
EmployeeData employeeData = null;
String sql = "SELECT EmpName, Flag FROM dbo.Employee WHERE EmpID = ?";
try (PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setInt(1, empID);
try (ResultSet resultSet = statement.executeQuery()) {
if (resultSet.next()) {
String empName = resultSet.getString("EmpName");
String flag = resultSet.getString("Flag");
employeeData = new EmployeeData(empName, flag);
}
}
} catch (SQLException e) {
System.err.println("SQL error occurred during lookup: " + e.getMessage());
throw new RuntimeException("Error during database lookup", e);
}
return employeeData;
}
public Connection createIIBJDBCConnection() throws MbException {
return getJDBCType4Connection("{MSSQL}:MSSQLPolicy", JDBC_TransactionType.MB_TRANSACTION_AUTO);
}
// Inner class to encapsulate employee data
private static class EmployeeData {
private final String empName;
private final String flag;
public EmployeeData(String empName, String flag) {
this.empName = empName;
this.flag = flag;
}
public String getEmpName() {
return empName;
}
public String getFlag() {
return flag;
}
}
}
Filter and Compute nodes for Filter and Profile activity
About this task
Retrieving data from the previous node and using it in the next node
Filter node example:
Input JSON
{
"OrderID": "12345",
"OrderStatus": "Approved",
"Customer": {
"Name": "John Doe",
"Email": "john.doe@example.com"
}
In IBM App Connect Professional a JDBC data source is used for database connectivity. In IBM App Connect Enterprise, an ODBC data source is used.
In IBM App Connect Professional, data from the previous node is mapped using Map Inputs and Map Outputs in the Checklist. In IBM App Connect Enterprise, a JavaCompute is used to access data from the previous node.
{
"data": [
{"id": 1, "name": "Item 1", "status": "active"},
{"id": 2, "name": "Item 2", "status": "inactive"},
{"id": 3, "name": "Item 3", "status": "active"}
]
}
package com.ibm.filter.example;
import java.util.ArrayList;
import java.util.List;
import com.ibm.broker.javacompute.MbJavaComputeNode;
import com.ibm.broker.plugin.MbElement;
import com.ibm.broker.plugin.MbMessage;
import com.ibm.broker.plugin.MbMessageAssembly;
import com.ibm.broker.plugin.MbOutputTerminal;
public class JavaComputeFilter_JavaCompute extends MbJavaComputeNode {
@Override
public void evaluate(MbMessageAssembly assembly) {
MbOutputTerminal out = getOutputTerminal("out");
try {
// Create a copy of the input message
MbMessage inMessage = assembly.getMessage();
MbMessage outMessage = new MbMessage(inMessage);
MbElement rootElement = outMessage.getRootElement();
// Access JSON data
MbElement jsonDataElement = rootElement.getFirstElementByPath("JSON/Data");
List<MbElement> list = new ArrayList<>();
if (jsonDataElement != null) {
MbElement dataArrayElement = jsonDataElement.getFirstElementByPath("data");
MbElement itemElement = dataArrayElement.getFirstChild();
while (itemElement != null) {
String status = itemElement.getFirstElementByPath("status").getValueAsString();
if ("active".equals(status)) {
// Clone and add the element to the filtered array
list.add(itemElement.copy());
System.out.println("list:" + list);
}
itemElement = itemElement.getNextSibling();
System.out.println("listsize:" + list.size());
}
// Replace the existing "data" element with the list
if (!list.isEmpty()) {
// Remove old "data" element
dataArrayElement.detach();
// Create a new "data" element with the filtered data
MbElement newDataElement = jsonDataElement.createElementAsLastChild(MbElement.TYPE_NAME, "data",
null);
for (MbElement element : list) {
newDataElement.addAsLastChild(element);
}
}
}
// Propagate the modified message
MbMessageAssembly outAssembly = new MbMessageAssembly(assembly, outMessage);
out.propagate(outAssembly);
} catch (Exception e) {
System.err.println("An error occurred: " + e.getMessage());
e.printStackTrace();
}
}
}
{
"data": [
{"id": 1, "name": "Item 1", "status": "active"},
{"id": 3, "name": "Item 3", "status": "active"}
]
}
Profile activity:
Example: Profiling customer orders
{
"Orders": [
{ "OrderID": "123", "Amount": 100, "Status": "Completed" },
{ "OrderID": "124", "Amount": 200, "Status": "Pending" },
{ "OrderID": "125", "Amount": 150, "Status": "Completed" }
]
}
CREATE COMPUTE MODULE ProfileSummary_Compute
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
DECLARE totalAmount INTEGER 0;
DECLARE orderCount INTEGER 0;
DECLARE completedCount INTEGER 0;
DECLARE minAmount INTEGER NULL;
DECLARE maxAmount INTEGER NULL;
-- Loop through orders
FOR order AS InputRoot.JSON.Data.Orders[] DO
SET totalAmount = totalAmount + order.Amount;
SET orderCount = orderCount + 1;
-- Count completed orders
IF order.Status = 'Completed' THEN
SET completedCount = completedCount + 1;
END IF;
-- Calculate min and max amounts
IF minAmount IS NULL OR order.Amount < minAmount THEN
SET minAmount = order.Amount;
END IF;
IF maxAmount IS NULL OR order.Amount > maxAmount THEN
SET maxAmount = order.Amount;
END IF;
END FOR;
-- Set summary in OutputRoot
SET OutputRoot.JSON.Data.ProfileSummary.TotalAmount = totalAmount;
SET OutputRoot.JSON.Data.ProfileSummary.OrderCount = orderCount;
SET OutputRoot.JSON.Data.ProfileSummary.CompletedCount = completedCount;
SET OutputRoot.JSON.Data.ProfileSummary.MinAmount = minAmount;
SET OutputRoot.JSON.Data.ProfileSummary.MaxAmount = maxAmount;
RETURN TRUE;
END;
END MODULE;
{
"ProfileSummary": {
"TotalAmount": 450,
"OrderCount": 3,
"CompletedCount": 2,
"MinAmount": 100,
"MaxAmount": 200
}
}