As a developer, you can integrate IBM Business Automation Manager Open Editions with Red Hat AMQ Streams or Apache Kafka. A business process can send and receive Kafka messages.
Kafka messages in a business process
Red Hat AMQ Streams, based on Apache Kafka, is a streaming platform. It acts as a message broker, passing messages, which are sorted into topics, between applications in a software environment.
Using IBM Business Automation Manager Open Editions, you can create business processes that send and receive Kafka messages. You can use the following methods to create business processes that send and receive Kafka messages:
-
Create a start event, intermediate catch event, or boundary event (attached to a human task) of the
message
type. KIE Server automatically subscribes to the Kafka topic that is defined in the message. A message triggers the event. The event node acts as the consumer of the message and can pass the content of the message to the subsequent node in the process. -
Create an end event or intermediate throw event of the type
message
. When the process triggers the event, KIE Server sends a Kafka message in the topic that is defined in the message. The message contains the data that is configured in the event. The event node acts as the producer of the message. -
Add the
KafkaPublishMessages
custom task to the process. This task does not require the KIE Server Kafka capability but can be more complicated to configure than message events. -
Configure your service and KIE Server with an emitter to automatically send Kafka messages about every completed process, case, and task when transactions are committed.
Creating an event that receives Kafka messages
When designing your business process in Business Central, you can create an event that receives Kafka messages.
This event is triggered each time a message arrives in the configured topic. The message must contain data that matches a predefined data object. The process engine parses the message and provides it as an output of the event.
-
You are logged in to Business Central and have permission to edit business processes.
-
You created a business process.
-
In Business Central, open the project that contains your business process.
-
Create a data object defining the data that the message must contain. For instructions about creating data objects, see Designing business processes using BPMN models.
-
Select the business process and open the business process designer.
-
Add a start event, an intermediate catch event, or a boundary event that is attached to a human task. The event must have the
message
type. -
Open the properties of the event.
-
In the Message field, select New and then enter the name of the message. This name must be the same as the name of the topic from which the event is to receive Kafka messages, or else must be defined as
topic-name
in anorg.kie.server.jbpm-kafka.ext.topics.topic-name
KIE Server system property.For instructions about using
org.kie.server.jbpm-kafka.ext.topics.*
system properties to define topic names, see Configuring KIE Server to send and receive Kafka messages from the process. -
Add an output data item and set the type of this data item to the data object that you created.
-
Save the business process.
Note
|
If a Kafka message event starts a new process instance, the |
To enable Red Hat AMQ Streams integration when running the process, you must configure KIE Server. For more information about configuring KIE Server for Red Hat AMQ Streams integration, see Configuring KIE Server to send and receive Kafka messages from the process.
Creating an event that sends Kafka messages
When designing your business process in Business Central, you can create an event that sends Kafka messages.
The event can have a data object as an input data item. The process engine sends the content of a data object as a message in the configured topic.
-
You are logged in to Business Central and have permission to edit business processes.
-
You created a business process.
-
In Business Central, open the project that contains your business process.
-
Create a data object defining the data that the message must contain. For instructions about creating data objects, see Designing business processes using BPMN models.
-
Select the business process and open the business process designer.
-
Add an intermediate throw event or an end event of the type
message
. -
Open the properties of the event.
-
In the Message field, select New and then enter the name of the message. This name must be the same as the name of the topic to which the event is to send Kafka messages, or else must be defined as
topic-name
in anorg.kie.server.jbpm-kafka.ext.topics.topic-name
KIE Server system property.For instructions about using
org.kie.server.jbpm-kafka.ext.topics.*
system properties to define topic names, see Configuring KIE Server to send and receive Kafka messages from the process. -
Add an input data item and set the type of this data item to the data object that you created.
-
Save the business process.
To enable Red Hat AMQ Streams integration when running the process, you must configure KIE Server. For more information about configuring KIE Server for Red Hat AMQ Streams integration, see Configuring KIE Server to send and receive Kafka messages from the process.
Adding a custom task that sends Kafka messages
You can add a KafkaPublishMessages
custom task to your process. This task sends Kafka messages. It does not use the KIE Server Kafka capability, so you can use this task in processes that do not run on a KIE Server. However, this task can be more complicated to configure than other Red Hat AMQ Streams integration options.
-
You are logged in to Business Central as an administrative user.
-
In the Business Central administrative settings menu, select Custom Tasks Administration.
-
Ensure that KafkaPublishMessages is set to On.
-
In Business Central, select Menu → Design → Projects and then click the space name and the project name.
-
Select the Settings → Custom Tasks tab.
-
In the KafkaPublishMessages line, click Install.
-
Optional: Enter the following information. If you leave any of the fields empty, the process engine uses the default values for these fields.
-
Bootstrap Servers: The host and port of the Kafka broker. You can use a comma-separated list of multiple host:port pairs. The default value is
localhost:9092
. -
Client ID: An identifier string to pass to the broker when making requests. Red Hat AMQ Streams uses this string for logging. The default value is
-
Key Serializer class: The class that provides the key serializer. The default value is
org.apache.kafka.common.serialization.StringSerializer
. You can enter a different value if you want to use your own custom serializer class. -
Value Serializer class: The class that provides the value serializer. The default value is
org.apache.kafka.common.serialization.StringSerializer
. You can enter a different value if you want to use your own custom serializer class.In any of these fields, you can enter an
env[property]
value. In this case, the process engine reads the setting from a system property at runtime. For example, you can set Client ID toenv[application.client.id]
and then, before running the process service, set the client ID value in theapplication.client.id
system property.
-
-
If you entered the name of a custom serializer class, complete the following additional configuration:
-
Select the Settings → Deployments → Work Item Handlers tab.
-
In the
KafkaPublishMessages
line, modify theValue
field to add theclassLoader
parameter. For example, the initial value of this field can be the following string:new org.jbpm.process.workitem.kafka.KafkaWorkItemHandler("127.0.0.1:9092", "jbpm", "com.myspace.test.MyCustomSerializer", "com.myspace.test.MyCustomSerializer")
In this example, change the value to the following string:
new org.jbpm.process.workitem.kafka.KafkaWorkItemHandler("127.0.0.1:9092", "jbpm", "com.myspace.test.MyCustomSerializer", "com.myspace.test.MyCustomSerializer", classLoader)
-
-
Select the Assets tab.
-
Select the business process and open the business process designer.
-
Add the
KafkaPublishMessages
custom task, available under Custom Tasks in the BPMN modeler palette. -
In the properties of the custom task, open the data assignments.
-
Assign the Key, Topic, and Value inputs to define the message.
If you entered a custom serializer class, you must provide this class to your business application. For instructions about providing custom classes to your business application, see Providing a custom class to your business application in Business Central.
Configuring KIE Server to send and receive Kafka messages from the process
To run a process that sends or receives Kafka messages using events, you must use KIE Server. You must configure the KIE Server instance to integrate with Red Hat AMQ Streams.
-
A KIE Server instance is installed.
-
To enable integration with Red Hat AMQ Streams, set the following system properties according to your environment:
-
If you are using KIE Server on Red Hat JBoss EAP, set the
org.kie.kafka.server.ext.disabled
KIE Server system property of tofalse
. -
If you are using Spring Boot, set the
kieserver.kafka.enabled
system property totrue
.
-
-
To configure the connection to the Kafka broker, set the
org.kie.server.jbpm-kafka.ext.bootstrap.servers
system property to the host and port of the broker. The default value islocalhost:9092
. You can use a comma-separated list of multiple host:port pairs. -
Optional: Set any of the following system properties related to both sending and receiving Kafka messages:
Table 1. Optional KIE Server system properties related to both sending and receiving Kafka messages Property Description org.kie.server.jbpm-kafka.ext.client.id
An identifier string to pass to the broker when making requests. Red Hat AMQ Streams uses this string for logging.
org.kie.server.jbpm-kafka.ext.topics.*
Mapping of message names to topic names. For example, if you want to send or receive a message in the
ExampleTopic
topic whenExampleName
is the name of the message, set theorg.kie.server.jbpm-kafka.ext.topics.ExampleName
system property toExampleTopic
. You can set any number of such system properties. If a message name is not mapped using a system property, the process engine uses this name as the topic name.org.kie.server.jbpm-kafka.ext.property_name
Set any Red Hat AMQ Streams consumer or producer property by using the
org.kie.server.jbpm-kafka.ext
prefix. For example, to set a value for thebuffer.memory
producer property, set theorg.kie.server.jbpm-kafka.ext.buffer.memory
KIE Server system property.This setting applies to all processes that send or receive Kafka messages using events on this KIE Server.
For a list of Red Hat AMQ Streams consumer and producer properties, see the Consumer configuration parameters and Producer configuration parameters appendixes in Using AMQ Streams on RHEL.
-
Optional: Set any of the following system properties related to receiving Kafka messages:
Table 2. Optional KIE Server system properties related to receiving Kafka messages Property Description Default value org.kie.server.jbpm-kafka.ext.allow.auto.create.topics
Allow automatic topic creation.
true
org.kie.server.jbpm-kafka.ext.group.id
A unique string that identifies the group to which this Kafka message consumer belongs.
jbpm-consumer
. -
Optional: Set any of the following system properties related to sending Kafka messages:
Table 3. Optional KIE Server system properties related to sending Kafka messages Property Description Default value org.kie.server.jbpm-kafka.ext.acks
The number of acknowledgements that the Kafka leader must receive before marking the request as complete.
1
. This value means the leader writes the record to its local log and then responds to the process engine, without waiting for full acknowledgement from all followers.org.kie.server.jbpm-kafka.ext.max.block.ms
The number of milliseconds for which the publish method blocks. After this time, the process engine can resume execution of the business process.
2000
(2 seconds).
Configuring the use a custom message format
By default, when using message events, the process engine sends and receives messages in a format compliant with the CloudEvents specification version 1.0.
Optionally, you can configure the use of a raw JSON data format or a custom format for the messages. If you want to use a custom format, you need to implement and provide classes.
-
Your project uses message events to send or receive messages.
-
If you want to use a custom format for sending or receiving messages, implement and provide custom classes:
-
Develop the source code for the classes:
-
To send messages, develop a class that implements the
KafkaEventWriter
interface -
To receive messages, develop a class that implements the
KafkaEventReader
interfaceYou can download the interface definitons from the GitHub repository.
-
-
Provide the classes to your business application. For instructions, see Providing a custom class to your business application in Business Central.
-
-
Set the following KIE Server system properties to set the custom writer or reader:
Table 4. KIE Server system properties for setting a custom writer or reader Property Description org.kie.server.jbpm-kafka.ext.eventWriterClass
The custom event writer class. Set this property to use a different format to send messages. If you want to use a custom format, set the property to the fully qualified name of your custom event writer class. If you want to use a raw JSON data format, set the property to
org.kie.server.services.jbpm.kafka.RawJsonEventWriter
.org.kie.server.jbpm-kafka.ext.eventReaderClass
The custom event reader class. Set this property to use a different format to receive messages. If you want to use a custom format, set the property to the fully qualified name of your custom event reader class. If you want to use a raw JSON data format, set the property to
org.kie.server.services.jbpm.kafka.RawJsonEventReader
.
Configuring a service and KIE Server to send Kafka messages when a transaction is committed
You can configure KIE Server with an emitter that sends Kafka messages automatically. In this case, KIE Server sends a message every time a task, process, case, or variable is created, updated, or deleted. The Kafka message contains information about the modified object. KIE Server sends the message when it commits the transaction with the change.
You can use this functionality with any business process or case. You do not need to change anything in the process design.
This configuration is also available if you run your process service using SpringBoot.
By default, KIE Server publishes the messages in the following topics:
-
jbpm-processes-events
for messages about completed processes -
jbpm-tasks-events
for messages about completed tasks -
jbpm-cases-events
for messages about completed cases
You can configure the topic names.
The published messages comply with the CloudEvents specification version 1.0. Each message contains the following fields:
-
id
: The unique identifier of the event -
type
: The type of the event (process, task, or case) -
source
: The event source as a URI -
time
: The timestamp of the event, by default in the RFC3339 format -
data
: Information about the process, case, or task, presented in a JSON format
-
A KIE Server instance is installed.
-
To send Kafka messages automatically, complete one of the following tasks:
-
If you deployed KIE Server on Red Hat JBoss EAP or another application server, complete the following steps:
-
Download the
bamoe-8.0.4-maven-repository.zip
product deliverable file from the Software Downloads page of the Red Hat Customer Portal. -
Extract the contents of the file.
-
Copy the
maven-repository/org/jbpm/jbpm-event-emitters-kafka/7.67.2.Final-redhat-0017/jbpm-event-emitters-kafka-7.67.2.Final-redhat-0017.jar
file into thedeployments/kie-server.war/WEB-INF/lib
subdirectory of the application server.
-
-
If you deployed the application using SpringBoot, add the following lines to the
<dependencies>
list in thepom.xml
file of your service:<dependency> <groupId>org.jbpm</groupId> <artifactId>jbpm-event-emitters-kafka</artifactId> <version>${version.org.kie}</version> </dependency>
-
-
Configure any of the following KIE Server system properties as necessary:
Table 5. KIE Server system properties related to the Kafka emitter Property Description Default value org.kie.jbpm.event.emitters.kafka.bootstrap.servers
:The host and port of the Kafka broker. You can use a comma-separated list of multiple host:port pairs.
localhost:9092
org.kie.jbpm.event.emitters.kafka.date_format
:The timestamp format for the
time
field of the messages.yyyy-MM-dd’T’HH:mm:ss.SSSZ
org.kie.jbpm.event.emitters.kafka.topic.processes
The topic name for process event messages.
jbpm-processes-events
org.kie.jbpm.event.emitters.kafka.topic.cases
The topic name for case event messages.
jbpm-cases-events
org.kie.jbpm.event.emitters.kafka.topic.tasks
The topic name for task event messages.
jbpm-processes-tasks
org.kie.jbpm.event.emitters.kafka.client.id
An identifier string to pass to the server when making requests. The server uses this string for logging.
org.kie.jbpm.event.emitters.kafka.property_name
Set any Red Hat AMQ Streams consumer or producer property by using this prefix. For example, to set a value for the
buffer.memory
producer property, set theorg.kie.jbpm.event.emitters.kafka.buffer.memory
KIE Server system property.This setting applies when KIE Server is configured with an emitter to send Kafka messages automatically when completing transactions.
For a list of Red Hat AMQ Streams consumer and producer properties, see the Consumer configuration parameters and Producer configuration parameters appendixes in Using AMQ Streams on RHEL.
org.kie.jbpm.event.emitters.eagerInit
By default, KIE Server initializes the Kafka emitter only when sending a message. If you want to initialize the Kafka emitter when KIE Server starts, set this property to
true
.When KIE Server initializes the Kafka emitter, it logs any errors in Kafka emitter configuration and any Kafka communication errors. If you set the
org.kie.jbpm.event.emitters.eagerInit
property totrue
, any such errors appear in the log output when KIE Server starts.false
Providing a custom class to your business application in Business Central
To interact with Red Hat AMQ Streams, your business application requires a custom class in the following cases:
-
You want to use a custom message format for sending or receiving messages using message events.
-
You want to use a custom serializer class for the
KafkaPublishMessages
custom task.
To use a custom class in your business application, use Business Central to upload the source code and configure the class.
Alternatively, if you deploy your application on SpringBoot, you can compile the classes separately and include them in the class path. In this case, do not complete this procedure.
-
You are logged in to Business Central and have permission to edit business processes.
-
You created a project for your business process.
-
Prepare Java source files with the required custom classes, for example,
MyCustomSerializer
. Use the package name for your space and project, for example,com.myspace.test
. -
In Business Central, enter your project and click the Settings → Dependencies tab.
-
In the Dependencies field, add dependencies that your custom classes require, for example,
org.apache.kafka.kafka-clients
, as a comma-separated list. -
Click the Assets tab.
-
For each of the class source files, complete the following steps:
-
Click Import Asset.
-
In the Please select a file to upload field, select the location of the Java source file for the custom serializer class.
-
Click OK.
-