FIFO Message Processing Enhancement
Sterling B2B Integrator supports FIFO message processing.
- JMS Queue adapter
- JMS Topic adapter
- MSMQ adapter
The ordered processing in Sterling Integrator is processed by the FIFO (first in first out) framework.
The following figure demonstrates the FIFO framework:
Sterling B2B Integrator supports FIFO processing of messages through adapters. The messages passed to the FIFO framework are first executed through a specialized routing key initialization business process that returns a single string value known as the routing key. The routing logic is then applied, which places all the messages with equal keys on the same internal routing queue. Messages with different routing key values process in parallel. Messages with the same routing key value maintain FIFO ordering. Each queue to user specified business process processes the message and waits for the business process to end the metadata describing the errant process, then processes the next message. If an error is encountered while processing the messages, metadata describing the errant process are routed to an error queue. Thereafter, the message processing continues.
Configuring external Active MQ for FIFO message processing
- Download the latest supported version of Active MQ from Apache Archives. Click the link labeled Archive of public software releases and select activemq/ > apache-activemq/ > <latest version>.
- Install Active MQ in a separate directory different from Sterling B2B Integrator. You can even install it
on a separate host.
If you download the tar file for a UNIX installation, you must run the command:
tar xvf apache-activemq-<latest version>-bin.tar
. This command creates theapache-activemq-<latest version>
directory and sub directories. - Place a copy of
<SI Root>/properties/activemq.xml
in theapache-activemq-<latest version>/conf
directory.Note: Active MQ uses Derby database and to configure Active MQ with Sterling B2B Integrator, you must copy the database driver from<SI Root>/dbjar/jdbc/<db vendor>/<db driver jar>
and place it inapache-activemq-<latest version>/lib
.In case the system does not populate theurl
and thedriverClassName
in theactivemq.xml
file, you must manually add the below properties and the respective values.Note: The below values are specific only to MSSQL database.<property name="driverClassName"> <value>com.microsoft.sqlserver.jdbc.SQLServerDriver</value> </property> <property name="url"> <value> jdbc:sqlserver://<DB_HOST>:<PORT>;databaseName=<DBNAME>;SelectMethod=cursor</value> </property>
Example of an
activemq.xml
file Note: The example below uses ActiveMQ v5.15.9. <beans default-lazy-init="false" default-dependency-check="none" default-autowire="no" xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.org/config/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> <!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:///export/home/paulb/apache-activemq-5.15.9/conf/credentials.properties</value> </property> </bean> <broker xmlns="http://activemq.org/config/1.0" brokerName="external_atlanta_28565" dataDirectory="/apps/l2/apache-activemq-5.15.9/data" useJmx="true"> <!-- <plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="jmsadmin" password="password" groups="users"/> </users> </simpleAuthenticationPlugin> <authorizationPlugin> <map> <authorizationMap> <authorizationEntries> <authorizationEntry queue=">" read="users" write="users" admin="users"/> <authorizationEntry topic=">" read="users" write="users" admin="users"/> </authorizationEntries> </authorizationMap> </map> </authorizationPlugin> </plugins> --> <destinationPolicy> <policyMap> <policyEntries> <!-- FIFO requires specific policy entries due to its request/response nature. There must be space set aside for the response queues to avoid a deadlock --> <policyEntry queue="RESPONSE.FIFO.GIS.>" memoryLimit="8mb"/> <policyEntry queue="FIFO.GIS.>" memoryLimit="64mb"/> <policyEntry queue=">" memoryLimit="128m" /> <policyEntry topic=">" memoryLimit="128m" /> </policyEntries> </policyMap> </destinationPolicy> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="512m"/> </memoryUsage> </systemUsage> </systemUsage> <transportConnectors> <transportConnector name="external_atlanta_28565" uri="tcp://0.0.0.0:28565" /> </transportConnectors> <!-- default network connector <networkConnectors> <networkConnector name="node161616" uri="multicast://default"/> </networkConnectors> --> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#derby-ds" useDatabaseLock="true" createTablesOnStartup="true"> <statements> <statements tablePrefix="EXT_"/> </statements> </jdbcPersistenceAdapter> </persistenceAdapter> <managementContext> <managementContext connectorPort="28566"/> </managementContext> </broker> <!-- ==================================================================== --> <!-- JDBC DataSource Configurations --> <!-- ==================================================================== --> <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource"> <property name="databaseName" value="derbydb"/> <property name="createDatabase" value="create"/> </bean> <jetty xmlns="http://mortbay.com/schemas/jetty/1.0"> <connectors> <nioConnector port="28565" /> </connectors> <handlers> <webAppContext contextPath="/admin" resourceBase="/apps/l2/apache-activemq- 5.15.9/webapps/admin" logUrlOnStart="true"/> </handlers> </jetty> </beans> - Install
activemq
jars on Sterling B2B Integrator using theInstall3rdParty
script.- Copy the jars from
apache-activemq-<latest version>/lib
andapache-activemq-<latest version>/lib/optional
folders to a temporary directory where Sterling B2B Integrator is installed. - Remove
slf4j-api
,jcl-over-slf4j
andlog4j
jars from the temporary directory. - Install the
activemq
jars, which are copied into the temporary directory, as third party jars in Sterling B2B Integrator.For example:
./install3rdParty.sh activemq <activemqversion> -jar <path to temporary directory>/*.jar
.After executing the above command, the following directory structure is created within Sterling B2B Integrator with all the
activemq
jars.<SI Root>/jar/activemq/<activemqversion>
- Copy the jars from
- Modify the
fifo.properties.in
file as shown below to transfer the FIFO functionality to an external Active MQ instance.fifo.broker.url=tcp://10.55.52.29:28565?wireFormat.maxInactivityDuration=0 fifo.adapterqueue.broker.url=tcp://10.55.52.29:28565? wireFormat.maxInactivityDuration=0 #fifo.broker.url=tcp://&HOST_NAME;:&ACTIVEMQ_PORT;? wireFormat.maxInactivityDuration=0 #fifo.adapterqueue.broker.url=tcp://&HOST_NAME;:&ACTIVEMQ_PORT;? wireFormat.maxInactivityDuration=0
Note: In the example, two lines of the original file is commented and replaced by lines that point to the external Active MQ instance. - Add the following line to the .profile of the user who owns the directory where Active MQ is
installed on
Unix:
export ACTIVEMQ_HOME=<directory where ActiveMQ is installed>/apache-activemq-<latest version>
In addition, you must modifyapache-activemq-<latest version>/conf/credentials.properties
to use the credentials defined in Sterling B2B Integrator.# activemq.username=system # activemq.password=manager activemq.username=jmsadmin activemq.password=password
Note: The uncommented lines above are the default values from Sterling B2B Integrator. - Start the external Active MQ instance using the Unix
nohup
command, so that Active MQ continues to run after the user logs out.apache-activemq-<latest version>/bin>nohup activemq &
You can start the Active MQ instance without using thenohup
command. You must typeactivemq
at the command prompt of theActiveMQ/bin
directory:/export/home/paulb/apache-activemq-<latest version>/bin>activemq
Note: The example below uses ActiveMQ v5.15.9. ACTIVEMQ_HOME: /export/home/paulb/apache-activemq-5.15.9 ACTIVEMQ_BASE: /export/home/paulb/apache-activemq-5.15.9 Loading message broker from: xbean:activemq.xml INFO BrokerService - Using Persistence Adapter: JDBCPersistenceAdaptor(org.apache.derby.jdbc.EmbeddedDataSource@1557c0) INFO JDBCPersistenceAdapter - Database driver recognized: [apache_derby_embedded_jdbc_driver] INFO DefaultDatabaseLocker - Attempting to acquire the exclusive lock to become the Master broker INFO DefaultDatabaseLocker - Becoming the master on dataSource: org.apache.derby.jdbc.EmbeddedDataSource@1557c0 INFO BrokerService - ActiveMQ 5.15.9 JMS Message Broker (external_atlanta_28565) is starting INFO BrokerService - For help or more information please see:http://activemq.apache.org/ INFO ManagementContext - JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:28566/jmxrmi INFO TransportServerThreadSupport - Listening for connections at: tcp://0.0.0.0:28565 INFO TransportConnector - Connector external_atlanta_28565 Started INFO BrokerService - ActiveMQ JMS Message Broker (external_atlanta_28565, id: atlanta-62386-1362442235480-0:0) started INFO log - Logging to org.slf4j.impl.JCLLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog INFO log - jetty-6.1.9 INFO WebConsoleStarter - ActiveMQ WebConsole initialized. INFO /admin - Initializing Spring FrameworkServlet 'dispatcher' INFO log - ActiveMQ Console at http://0.0.0.0:28565/admin INFO log - Started SelectChannelConnector@0.0.0.0:28565 WARN BrokerRegistry - Broker localhost not started so using external_atlanta_28565 instead INFO TransportConnector - Connector vm://localhost Started
- Start FIFO Error Queue Listener and FIFO Routing Service to enable FIFO processing within Sterling B2B Integrator.
- Configure a JMS Queue
Adapter.
testJMSQueueAdatper Service Settings Service Type JMS Queue Adapter Description Version 1 System Name testJMSQueueAdatper Group Name None Connection Type Using Jndi Initial Context Factory org.apache.activemq.jndi.ActiveMQInitialContextFactory URL tcp://10.55.52.29:28565 Remote Queue Name dynamicQueues/workflow.taskqueue.simpleOB <!-- Note that the Remote Queue name begins with dynamicQueues. The BP queue naming convention is workflow.taskqueue.<name of your own choosing> --> Remote Queue Connection Factory ConnectionFactory Remote User Name None provided Remote Password None provided Connection User Name None provided Connection Password None provided Turn on debug messages? Yes Queue Type Queue Receive Async Bootstrap Workflow Simple_OB_with_Immediate_Enveloping Document Storage Type System Default Buffer Size None provided Document Filename None provided Connection retry attempts None provided Delay between retries None provided Notification Workflow None provided Process Execution Mode FIFO Fifo Routing BP getFifoRoutingKey Jar Locations None provided
Sample Business Processes:Simple_OB_with_Immediate_Enveloping <process name="Simple_OB_with_Immediate_Enveloping"> <sequence name="begin_the_beguine"> <operation name="EDI Encoder"> <participant name="EDIEncoder"/> <output message="EDIEncoderTypeInputMessage"> <assign to="AccepterLookupAlias">850</assign> <assign to="EDIStandard">X12</assign> <assign to="Mode">IMMEDIATE</assign> <assign to="ReceiverID">ACMEPROD</assign> <assign to="SenderID">TECHSUPP</assign> <assign to="." from="*"></assign> </output> <input message="inmsg"> <assign to="." from="*"></assign> </input> </operation> <operation name="EDI Envelope"> <participant name="EDIEnvelope"/> <output message="EDIEnvelopeTypeInputMessage"> <assign to="HALT_ON_TRANS_ERROR">true</assign> <assign to="MODE">IMMEDIATE</assign> <assign to="ReceiverID">ACMEPROD</assign> <assign to="SenderID">TECHSUPP</assign> <assign to="." from="*"></assign> </output> <input message="inmsg"> <assign to="." from="*"></assign> </input> </operation> <operation name="Invoke Business Process Service"> <participant name="InvokeBusinessProcessService"/> <output message="InvokeBusinessProcessServiceTypeInputMessage"> <assign to="INVOKE_MODE">SYNC</assign> <assign to="." from="*"></assign> </output> <input message="inmsg"> <assign to="." from="*"></assign> </input> </operation> </sequence> </process> getFifoRoutingKey <process name="getFifoRoutingKey"> <sequence> <operation name="Typing"> <participant name="TypingService"/> <output message="TypingServiceTypeInputMessage"> <assign to="required_parmlist">FifoRoutingKey</assign> <assign to="typing_maplist">getFifoRoutingKey</assign> <assign to="validate_input_against_dtd">NO</assign> <assign to="." from="*"></assign> </output> <input message="inmsg"> <assign to="." from="*"></assign> </input> </operation> </sequence> </process> Note: The
getFIFORoutingKey
Business Process must to be checked-in to Sterling B2B Integrator with the BP Queueing box unchecked and the Event Reporting Level set to Minimal or above to avoid errors when starting or running the JMS Queue Adapter.
Configuring FIFO Adapter
You can configure the FIFO adapter to work with external WebSphere MQ and external ActiveMQ for an IIM-based installations.
IIM-based installation
The WebSphere MQ connection parameters need to be configured after the installation through the
customer override customization UI or using the customer_override.properties
file
by overriding the following properties from the fifo.properties
file.
1. fifo.broker.username= (WMQ/ACtiveMQ user name)
2. fifo.broker.password=(WMQ/ACtiveMQ password)
3. fifo.broker.host=(WMQ/ACtiveMQ Server host name)
4. fifo.broker.port=(WMQ/ACtiveMQ Server port number)
#Optional for ActiveMQ
5. fifo.broker.channel=(WMQ channel)
6. fifo.broker.connectionNameList=(WMQ Connection name list for a WMQ cluster setup)
7. fifo.adapterqueue.broker.username=(WMQ/ACtiveMQ user name)
8. fifo.adapterqueue.broker.password=(WMQ/ACtiveMQ password)
9. fifo.adapterqueue.broker.host=(WMQ/ACtiveMQ Server host name)
10. fifo.adapterqueue.broker.port=(WMQ/ACtiveMQ Server port number)
#Optional for ActiveMQ
11. fifo.adapterqueue.broker.channel=(WMQ channel)
12. fifo.adapterqueue.broker.connectionNameList=(WMQ Connection name list for a WMQ cluster setup)
13. fifo.jms.vendor=(activemq/ibmmq)