As a developer, you can integrate IBM Business Automation Manager Open Editions with Red Hat AMQ Streams or Apache Kafka. A business process can send and receive Kafka messages.

Kafka messages in a business process

Red Hat AMQ Streams, based on Apache Kafka, is a streaming platform. It acts as a message broker, passing messages, which are sorted into topics, between applications in a software environment.

Using IBM Business Automation Manager Open Editions, you can create business processes that send and receive Kafka messages. You can use the following methods to create business processes that send and receive Kafka messages:

  • Create a start event, intermediate catch event, or boundary event (attached to a human task) of the message type. KIE Server automatically subscribes to the Kafka topic that is defined in the message. A message triggers the event. The event node acts as the consumer of the message and can pass the content of the message to the subsequent node in the process.

  • Create an end event or intermediate throw event of the type message. When the process triggers the event, KIE Server sends a Kafka message in the topic that is defined in the message. The message contains the data that is configured in the event. The event node acts as the producer of the message.

  • Add the KafkaPublishMessages custom task to the process. This task does not require the KIE Server Kafka capability but can be more complicated to configure than message events.

  • Configure your service and KIE Server with an emitter to automatically send Kafka messages about every completed process, case, and task when transactions are committed.

Creating an event that receives Kafka messages

When designing your business process in Business Central, you can create an event that receives Kafka messages.

This event is triggered each time a message arrives in the configured topic. The message must contain data that matches a predefined data object. The process engine parses the message and provides it as an output of the event.

Prerequisites
  • You are logged in to Business Central and have permission to edit business processes.

  • You created a business process.

Procedure
  1. In Business Central, open the project that contains your business process.

  2. Create a data object defining the data that the message must contain. For instructions about creating data objects, see Designing business processes using BPMN models.

  3. Select the business process and open the business process designer.

  4. Add a start event, an intermediate catch event, or a boundary event that is attached to a human task. The event must have the message type.

  5. Open the properties of the event.

  6. In the Message field, select New and then enter the name of the message. This name must be the same as the name of the topic from which the event is to receive Kafka messages, or else must be defined as topic-name in an org.kie.server.jbpm-kafka.ext.topics.topic-name KIE Server system property.

    For instructions about using org.kie.server.jbpm-kafka.ext.topics.* system properties to define topic names, see Configuring KIE Server to send and receive Kafka messages from the process.

  7. Add an output data item and set the type of this data item to the data object that you created.

  8. Save the business process.

Note

If a Kafka message event starts a new process instance, the initiator field of the instance is set to unknown by default, because the Kafka consumer extension does not receive the identity of the user associated with the Kafka message.

Next steps

To enable Red Hat AMQ Streams integration when running the process, you must configure KIE Server. For more information about configuring KIE Server for Red Hat AMQ Streams integration, see Configuring KIE Server to send and receive Kafka messages from the process.

Creating an event that sends Kafka messages

When designing your business process in Business Central, you can create an event that sends Kafka messages.

The event can have a data object as an input data item. The process engine sends the content of a data object as a message in the configured topic.

Prerequisites
  • You are logged in to Business Central and have permission to edit business processes.

  • You created a business process.

Procedure
  1. In Business Central, open the project that contains your business process.

  2. Create a data object defining the data that the message must contain. For instructions about creating data objects, see Designing business processes using BPMN models.

  3. Select the business process and open the business process designer.

  4. Add an intermediate throw event or an end event of the type message.

  5. Open the properties of the event.

  6. In the Message field, select New and then enter the name of the message. This name must be the same as the name of the topic to which the event is to send Kafka messages, or else must be defined as topic-name in an org.kie.server.jbpm-kafka.ext.topics.topic-name KIE Server system property.

    For instructions about using org.kie.server.jbpm-kafka.ext.topics.* system properties to define topic names, see Configuring KIE Server to send and receive Kafka messages from the process.

  7. Add an input data item and set the type of this data item to the data object that you created.

  8. Save the business process.

Next steps

To enable Red Hat AMQ Streams integration when running the process, you must configure KIE Server. For more information about configuring KIE Server for Red Hat AMQ Streams integration, see Configuring KIE Server to send and receive Kafka messages from the process.

Adding a custom task that sends Kafka messages

You can add a KafkaPublishMessages custom task to your process. This task sends Kafka messages. It does not use the KIE Server Kafka capability, so you can use this task in processes that do not run on a KIE Server. However, this task can be more complicated to configure than other Red Hat AMQ Streams integration options.

Prerequisites
  • You are logged in to Business Central as an administrative user.

Procedure
  1. In the Business Central administrative settings menu, select Custom Tasks Administration.

  2. Ensure that KafkaPublishMessages is set to On.

  3. In Business Central, select MenuDesignProjects and then click the space name and the project name.

  4. Select the SettingsCustom Tasks tab.

  5. In the KafkaPublishMessages line, click Install.

  6. Optional: Enter the following information. If you leave any of the fields empty, the process engine uses the default values for these fields.

    • Bootstrap Servers: The host and port of the Kafka broker. You can use a comma-separated list of multiple host:port pairs. The default value is localhost:9092.

    • Client ID: An identifier string to pass to the broker when making requests. Red Hat AMQ Streams uses this string for logging. The default value is

    • Key Serializer class: The class that provides the key serializer. The default value is org.apache.kafka.common.serialization.StringSerializer. You can enter a different value if you want to use your own custom serializer class.

    • Value Serializer class: The class that provides the value serializer. The default value is org.apache.kafka.common.serialization.StringSerializer. You can enter a different value if you want to use your own custom serializer class.

      In any of these fields, you can enter an env[property] value. In this case, the process engine reads the setting from a system property at runtime. For example, you can set Client ID to env[application.client.id] and then, before running the process service, set the client ID value in the application.client.id system property.

  7. If you entered the name of a custom serializer class, complete the following additional configuration:

    1. Select the SettingsDeploymentsWork Item Handlers tab.

    2. In the KafkaPublishMessages line, modify the Value field to add the classLoader parameter. For example, the initial value of this field can be the following string:

      new org.jbpm.process.workitem.kafka.KafkaWorkItemHandler("127.0.0.1:9092", "jbpm", "com.myspace.test.MyCustomSerializer", "com.myspace.test.MyCustomSerializer")

      In this example, change the value to the following string:

      new org.jbpm.process.workitem.kafka.KafkaWorkItemHandler("127.0.0.1:9092", "jbpm", "com.myspace.test.MyCustomSerializer", "com.myspace.test.MyCustomSerializer", classLoader)
  8. Select the Assets tab.

  9. Select the business process and open the business process designer.

  10. Add the KafkaPublishMessages custom task, available under Custom Tasks in the BPMN modeler palette.

  11. In the properties of the custom task, open the data assignments.

  12. Assign the Key, Topic, and Value inputs to define the message.

Next steps

If you entered a custom serializer class, you must provide this class to your business application. For instructions about providing custom classes to your business application, see Providing a custom class to your business application in Business Central.

Configuring KIE Server to send and receive Kafka messages from the process

To run a process that sends or receives Kafka messages using events, you must use KIE Server. You must configure the KIE Server instance to integrate with Red Hat AMQ Streams.

Prerequisites
  • A KIE Server instance is installed.

Procedure
  1. To enable integration with Red Hat AMQ Streams, set the following system properties according to your environment:

    • If you are using KIE Server on Red Hat JBoss EAP, set the org.kie.kafka.server.ext.disabled KIE Server system property of to false.

    • If you are using Spring Boot, set the kieserver.kafka.enabled system property to true.

  2. To configure the connection to the Kafka broker, set the org.kie.server.jbpm-kafka.ext.bootstrap.servers system property to the host and port of the broker. The default value is localhost:9092. You can use a comma-separated list of multiple host:port pairs.

  3. Optional: Set any of the following system properties related to both sending and receiving Kafka messages:

    Table 1. Optional KIE Server system properties related to both sending and receiving Kafka messages
    Property Description

    org.kie.server.jbpm-kafka.ext.client.id

    An identifier string to pass to the broker when making requests. Red Hat AMQ Streams uses this string for logging.

    org.kie.server.jbpm-kafka.ext.topics.*

    Mapping of message names to topic names. For example, if you want to send or receive a message in the ExampleTopic topic when ExampleName is the name of the message, set the org.kie.server.jbpm-kafka.ext.topics.ExampleName system property to ExampleTopic. You can set any number of such system properties. If a message name is not mapped using a system property, the process engine uses this name as the topic name.

    org.kie.server.jbpm-kafka.ext.property_name

    Set any Red Hat AMQ Streams consumer or producer property by using the org.kie.server.jbpm-kafka.ext prefix. For example, to set a value for the buffer.memory producer property, set the org.kie.server.jbpm-kafka.ext.buffer.memory KIE Server system property.

    This setting applies to all processes that send or receive Kafka messages using events on this KIE Server.

    For a list of Red Hat AMQ Streams consumer and producer properties, see the Consumer configuration parameters and Producer configuration parameters appendixes in Using AMQ Streams on RHEL.

  4. Optional: Set any of the following system properties related to receiving Kafka messages:

    Table 2. Optional KIE Server system properties related to receiving Kafka messages
    Property Description Default value

    org.kie.server.jbpm-kafka.ext.allow.auto.create.topics

    Allow automatic topic creation.

    true

    org.kie.server.jbpm-kafka.ext.group.id

    A unique string that identifies the group to which this Kafka message consumer belongs.

    jbpm-consumer.

  5. Optional: Set any of the following system properties related to sending Kafka messages:

    Table 3. Optional KIE Server system properties related to sending Kafka messages
    Property Description Default value

    org.kie.server.jbpm-kafka.ext.acks

    The number of acknowledgements that the Kafka leader must receive before marking the request as complete.

    1. This value means the leader writes the record to its local log and then responds to the process engine, without waiting for full acknowledgement from all followers.

    org.kie.server.jbpm-kafka.ext.max.block.ms

    The number of milliseconds for which the publish method blocks. After this time, the process engine can resume execution of the business process.

    2000 (2 seconds).

Configuring the use a custom message format

By default, when using message events, the process engine sends and receives messages in a format compliant with the CloudEvents specification version 1.0.

Optionally, you can configure the use of a raw JSON data format or a custom format for the messages. If you want to use a custom format, you need to implement and provide classes.

Prerequisites
  • Your project uses message events to send or receive messages.

Procedure
  1. If you want to use a custom format for sending or receiving messages, implement and provide custom classes:

    1. Develop the source code for the classes:

      • To send messages, develop a class that implements the KafkaEventWriter interface

      • To receive messages, develop a class that implements the KafkaEventReader interface

        You can download the interface definitons from the GitHub repository.

    2. Provide the classes to your business application. For instructions, see Providing a custom class to your business application in Business Central.

  2. Set the following KIE Server system properties to set the custom writer or reader:

    Table 4. KIE Server system properties for setting a custom writer or reader
    Property Description

    org.kie.server.jbpm-kafka.ext.eventWriterClass

    The custom event writer class. Set this property to use a different format to send messages. If you want to use a custom format, set the property to the fully qualified name of your custom event writer class. If you want to use a raw JSON data format, set the property to org.kie.server.services.jbpm.kafka.RawJsonEventWriter.

    org.kie.server.jbpm-kafka.ext.eventReaderClass

    The custom event reader class. Set this property to use a different format to receive messages. If you want to use a custom format, set the property to the fully qualified name of your custom event reader class. If you want to use a raw JSON data format, set the property to org.kie.server.services.jbpm.kafka.RawJsonEventReader.

Configuring a service and KIE Server to send Kafka messages when a transaction is committed

You can configure KIE Server with an emitter that sends Kafka messages automatically. In this case, KIE Server sends a message every time a task, process, case, or variable is created, updated, or deleted. The Kafka message contains information about the modified object. KIE Server sends the message when it commits the transaction with the change.

You can use this functionality with any business process or case. You do not need to change anything in the process design.

This configuration is also available if you run your process service using SpringBoot.

By default, KIE Server publishes the messages in the following topics:

  • jbpm-processes-events for messages about completed processes

  • jbpm-tasks-events for messages about completed tasks

  • jbpm-cases-events for messages about completed cases

You can configure the topic names.

The published messages comply with the CloudEvents specification version 1.0. Each message contains the following fields:

  • id: The unique identifier of the event

  • type: The type of the event (process, task, or case)

  • source: The event source as a URI

  • time: The timestamp of the event, by default in the RFC3339 format

  • data: Information about the process, case, or task, presented in a JSON format

Prerequisites
  • A KIE Server instance is installed.

Procedure
  1. To send Kafka messages automatically, complete one of the following tasks:

    1. If you deployed KIE Server on Red Hat JBoss EAP or another application server, complete the following steps:

      1. Download the bamoe-8.0.4-maven-repository.zip product deliverable file from the Software Downloads page of the Red Hat Customer Portal.

      2. Extract the contents of the file.

      3. Copy the maven-repository/org/jbpm/jbpm-event-emitters-kafka/7.67.2.Final-redhat-0017/jbpm-event-emitters-kafka-7.67.2.Final-redhat-0017.jar file into the deployments/kie-server.war/WEB-INF/lib subdirectory of the application server.

    2. If you deployed the application using SpringBoot, add the following lines to the <dependencies> list in the pom.xml file of your service:

      <dependency>
        <groupId>org.jbpm</groupId>
        <artifactId>jbpm-event-emitters-kafka</artifactId>
        <version>${version.org.kie}</version>
      </dependency>
  2. Configure any of the following KIE Server system properties as necessary:

    Table 5. KIE Server system properties related to the Kafka emitter
    Property Description Default value

    org.kie.jbpm.event.emitters.kafka.bootstrap.servers:

    The host and port of the Kafka broker. You can use a comma-separated list of multiple host:port pairs.

    localhost:9092

    org.kie.jbpm.event.emitters.kafka.date_format:

    The timestamp format for the time field of the messages.

    yyyy-MM-dd’T’HH:mm:ss.SSSZ

    org.kie.jbpm.event.emitters.kafka.topic.processes

    The topic name for process event messages.

    jbpm-processes-events

    org.kie.jbpm.event.emitters.kafka.topic.cases

    The topic name for case event messages.

    jbpm-cases-events

    org.kie.jbpm.event.emitters.kafka.topic.tasks

    The topic name for task event messages.

    jbpm-processes-tasks

    org.kie.jbpm.event.emitters.kafka.client.id

    An identifier string to pass to the server when making requests. The server uses this string for logging.

    org.kie.jbpm.event.emitters.kafka.property_name

    Set any Red Hat AMQ Streams consumer or producer property by using this prefix. For example, to set a value for the buffer.memory producer property, set the org.kie.jbpm.event.emitters.kafka.buffer.memory KIE Server system property.

    This setting applies when KIE Server is configured with an emitter to send Kafka messages automatically when completing transactions.

    For a list of Red Hat AMQ Streams consumer and producer properties, see the Consumer configuration parameters and Producer configuration parameters appendixes in Using AMQ Streams on RHEL.

    org.kie.jbpm.event.emitters.eagerInit

    By default, KIE Server initializes the Kafka emitter only when sending a message. If you want to initialize the Kafka emitter when KIE Server starts, set this property to true.

    When KIE Server initializes the Kafka emitter, it logs any errors in Kafka emitter configuration and any Kafka communication errors. If you set the org.kie.jbpm.event.emitters.eagerInit property to true, any such errors appear in the log output when KIE Server starts.

    false

Providing a custom class to your business application in Business Central

To interact with Red Hat AMQ Streams, your business application requires a custom class in the following cases:

  • You want to use a custom message format for sending or receiving messages using message events.

  • You want to use a custom serializer class for the KafkaPublishMessages custom task.

To use a custom class in your business application, use Business Central to upload the source code and configure the class.

Alternatively, if you deploy your application on SpringBoot, you can compile the classes separately and include them in the class path. In this case, do not complete this procedure.

Prerequisites
  • You are logged in to Business Central and have permission to edit business processes.

  • You created a project for your business process.

Procedure
  1. Prepare Java source files with the required custom classes, for example, MyCustomSerializer. Use the package name for your space and project, for example, com.myspace.test.

  2. In Business Central, enter your project and click the SettingsDependencies tab.

  3. In the Dependencies field, add dependencies that your custom classes require, for example, org.apache.kafka.kafka-clients, as a comma-separated list.

  4. Click the Assets tab.

  5. For each of the class source files, complete the following steps:

    1. Click Import Asset.

    2. In the Please select a file to upload field, select the location of the Java source file for the custom serializer class.

    3. Click OK.

Additional Resources