The combination of events and business processes is a way to unlock powerful capabilities in automation solutions developed in BAMOE.

Combining events and Workflows

Here are some examples of the use cases:

  • Process monitoring: by emitting events at various stages of a process it is possible to monitor process execution and create real-time dashboards.

  • Auditability: use detailed event logs to record significant process changes and provide a complete audit trail for compliance.

  • Event-based integration: enable loosely coupled, scalable microservices through actions triggered by events.

  • Optimize processes: use events to enable external consumption of process changes and identify bottlenecks.

Messaging and event-driven capabilities are provided through a modular and flexible approach using add-ons to enable these capabilities in your Quarkus or Spring Boot based projects. Events are published in the CloudEvents format, making them easily consumable by external systems.

Events are categorized as follows:

Messaging Events

are related to BPMN message elements enabling a process to register an event listener.

Process/Runtime Events

are related to Process Instances, tasks, and variables

Messaging add-on

The Messaging add-on enables integration between BPMN message elements and external event sources or listeners, depending on the message type:

  • Start message node

  • Intermediate Message

  • Throw/Catch nodes

  • End message node

  • Boundary message node

The following table shows more detail on message events and the corresponding Kafka topics used as channels to publish message events.

Event Type Published when Kafka Topics

Start Message Event

When message is received on incoming channel to start Process Instance

Channel name configured in mp.messaging.incoming.<channel-name> (e.g., travelers)

Intermediate Message Throw Event

During process execution when the throw event node is reached

Channel name configured in mp.messaging.outgoing.<channel-name> (e.g., correlation)

Intermediate Message Catch Event

Process waits at this node until message is received on incoming channel

Channel name configured in mp.messaging.incoming.<channel-name> (e.g., customers)

End Message Event

When Process Instance completes and sends final message

Channel name configured in mp.messaging.outgoing.<channel-name> (e.g., processedtravelers)

Boundary Message Event

When message is received while attached task is active

Channel name configured in mp.messaging.incoming.<channel-name>

To configure messaging capabilities for your Business Services in Quarkus add the org.kie:kie-addons-quarkus-messaging dependency. For the dependency, see Workflows (Quarkus add-ons and extra features).

As the Quarkus add-on is vendor independent you will need to add the io.quarkus:quarkus-messaging-kafka dependency to be able to use Kafka as the message broker. For more information, see io.quarkus:quarkus-messaging-kafka external documentation.

You can see an example application of an event driven process (process-event-driven) developed with Quarkus in the BAMOE samples, see Example Business Service projects.

The Spring Boot add-on is Kafka-specific therefore you will only need to add the org.kie:kie-addons-springboot-messaging dependency to enable messaging. For more information, see org.kie:kie-addons-springboot-messaging external documentation.

Correlating Messages to Engines at Runtime

Correlation uses three BPMN components to route messages to the correct process instance:

  1. Correlation Property - Defines which field to extract from the message

    Example: <bpmn2:messagePath>id</bpmn2:messagePath> extracts the id field

  2. Correlation Key - Groups correlation properties as a unique identifier

    Example: <bpmn2:correlationKey id="CorrelationKeyId"> references the property above

  3. Correlation Subscription - Defines the expected value for matching

    Example: <bpmn2:dataPath>"1"</bpmn2:dataPath> means only match if id = "1"

Runtime Behavior: When a message arrives, the engine:

  • Extracts the correlation property value (e.g., id field from message)

  • Checks if any process instance expects that specific value

  • Delivers the message only to matching instances

  • Discards messages with no match

Complete BPMN Configuration

See the full correlation configuration in bamoe-examples/process-event-driven/hotels/src/main/resources/hotel.bpmn:

<bpmn2:itemDefinition id="CorrelationPropertyId_Type" structureRef="String" />
<bpmn2:correlationProperty id="CorrelationPropertyId" name="Correlation Property 1" type="CorrelationPropertyId_Type">
  <bpmn2:correlationPropertyRetrievalExpression
    id="_01E88DC7-3BDD-48E3-BCF9-39AD0772B2AB"
    messageRef="_GlqmEIpdED2dBNv7eRNehQ"
  >
    <bpmn2:messagePath
      id="_F1B5A911-766C-4E07-932E-A7C9BC825274"
      evaluatesToTypeRef="_6404189C-98F1-471A-899F-4A66C9E9BB0E"
      language="java"
    >id</bpmn2:messagePath>
  </bpmn2:correlationPropertyRetrievalExpression>
</bpmn2:correlationProperty>

<bpmn2:correlationKey id="CorrelationKeyId" name="Correlation Key Name">
  <bpmn2:correlationPropertyRef>CorrelationPropertyId</bpmn2:correlationPropertyRef>
</bpmn2:correlationKey>

Configuration in application.properties

The following properties are required to configure messaging channels for correlation:

# Kafka broker URL
kafka.bootstrap.servers=localhost:9092

# Incoming channel (for Start/Catch/Boundary events)
mp.messaging.incoming.<channel-name>.connector=smallrye-kafka
mp.messaging.incoming.<channel-name>.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.<channel-name>.auto.offset.reset=earliest

# Outgoing channel (for Throw/End events)
mp.messaging.outgoing.<channel-name>.connector=smallrye-kafka
mp.messaging.outgoing.<channel-name>.value.serializer=org.apache.kafka.common.serialization.StringSerializer

Message Routing with kogitoprocrefid

For intermediate message workflows, kogitoprocrefid routes incoming messages to the correct waiting process instance. This field needs to come in on the intermediate message. The value should be the Process Instance ID of the process that is currently waiting. This is a necessary step for getting the incoming message assigned to the right Process Instance.

When is it needed?

Use kogitoprocrefid when you have multiple Process Instances waiting at the same intermediate catch event and need to route a message to a specific instance. Without kogitoprocrefid, the message will be delivered to any waiting Process Instance, which may not be the intended one.

How to include it in the message:

You can include kogitoprocrefid in two ways:

  • As a CloudEvent HTTP header (when sending via HTTP/REST):

ce-type: myevent
ce-source: /from/unit/test
ce-specversion: 1.0
ce-id: 21627e26-31eb-43e7-8343-92a696fd96b1
ce-kogitoprocrefid: 12345
Content-Type: application/json
  • As a field in the CloudEvent JSON payload (when sending via Kafka or HTTP with JSON payload):

{
  "kogitoprocrefid": "12345",
  "specversion": "0.3",
  "id": "21627e26-31eb-43e7-8343-92a696fd96b1",
  "source": "/process/instance/12345",
  "type": "process.persons.travellers",
  "time": "2019-10-01T12:02:23.812262+02:00",
  "data": {
    "firstName": "Jan",
    "lastName": "Kowalski",
    "email": "jan.kowalski@example.com",
    "nationality": "Polish"
  }
}

Process event add-on

BAMOE provides the Process event add-on, used to send processes, tasks, and variable events to an external event listener when execution of an operation modifies the state of a process. These events are known as runtime events, and can include any of the following:

  • Any change in the state of a Process Instance, including execution and violation of SLA

  • Any change of state of a task or its metadata, including violation of an SLA

  • any change in existing process definitions at application start including updates to metadata or structure. Used to capture and track the definition and version of processes.

The Process event add-on can also be used to build a historical representation of all Process Instance executions. If you use this add-on with the Process REST API you can also build a custom graphical user interface to handle the user tasks.

Each modifying operation is executed within an abstraction called a unit of work. A runtime event is published when a unit of work is completed. Events are published in the CloudEvents format to one of three Kafka Topics. See table below for more details on the publication of the three supported event types.

Event Type Published when Kafka Topics

ProcessInstanceEvent

  • the Process Instance changes state (started, ended - completed/aborted, migrated to a different version, updated - when a change in the internal state happens e.g., retriggering the SLA)

  • there is an error in the process execution

  • each time a node is executed

  • there is a change in variables

  • an SLA is violated

kogito-processinstances-events

UserTaskInstanceEvent

  • the task state changes (transitions between states)

  • the task assignments change (the owner or actor of the task)

  • the task deadlines aren’t met

  • there are variable changes (inputs or outputs)

  • there are changes in the task attachments (added, modified, deleted)

  • there are changes in the task comments (added, modified, deleted)

kogito-usertaskinstances-events

ProcessDefinitionEvent

  • when process definitions change (metadata and node structures).

kogito-processdefinitions-events

In Quarkus, you configure process event capabilities for your Business Services by adding the process event add-on using the org.kie:kie-addons-quarkus-events-process dependency. For more information, see Workflows (Quarkus add-ons and extra features).

In Spring Boot, you configure process event capabilities for your Business Services by adding the org.kie:kie-addons-springboot-events-process-kafka dependency. For more information, see org.kie:kie-addons-springboot-events-process-kafka external documentation.

The Process event add-on for Spring Boot is specific to Kafka. Therefore, you do not need to add additional dependencies to use an event broker.

For Quarkus, the BAMOE process event add-on implementation is based on Smallrye Messaging library, which provides a set of connectors for event brokers, such as JMS, AMQP, and Kafka. The Smallrye connector dependency is part of the io.quarkus:quarkus-messaging-kafka dependencey.

Smallrye defines an abstraction named channel to enable multi-broker support. For every channel that you define in your Business service application, you can specify the connector to use for that channel using the following form:

mp.messaging.[incoming|outgoing].<channel name>.connector = <connector name>

Optionally, for Kafka connector, you can define the Kafka topic to be used for that channel using the following form:

mp.messaging.[incoming|outgoing].<channel name>.topic = <topic name>

You can also set up a channel property using the following form:

mp.messaging.[incoming|outgoing].<channel name>.<property name>= <property value>

If the defined property is not found, then the topic name is considered to be the same as Kafka name.

The process event add-on defines a channel for each event type (kogito-processinstances-events, kogito-usertaskinstances-events, kogito-variables-events, kogito-processdefinitions-events). Therefore, when the process event add-on is enabled, you must add the event types to your application.properties file using the following properties:

# Kafka broker URL
kafka.bootstrap.servers=localhost:9092

# Process instance events
mp.messaging.outgoing.kogito-processinstances-events.connector=smallrye-kafka
mp.messaging.outgoing.kogito-processinstances-events.topic=kogito-processinstances-events
mp.messaging.outgoing.kogito-processinstances-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer

# User task events
mp.messaging.outgoing.kogito-usertaskinstances-events.connector=smallrye-kafka
mp.messaging.outgoing.kogito-usertaskinstances-events.topic=kogito-usertaskinstances-events
mp.messaging.outgoing.kogito-usertaskinstances-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer

# Variable events
mp.messaging.outgoing.kogito-variables-events.connector=smallrye-kafka
mp.messaging.outgoing.kogito-variables-events.topic=kogito-variables-events
mp.messaging.outgoing.kogito-variables-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer

# Process definition events
mp.messaging.outgoing.kogito-processdefinitions-events.connector=smallrye-kafka
mp.messaging.outgoing.kogito-processdefinitions-events.topic=kogito-processdefinitions-events
mp.messaging.outgoing.kogito-processdefinitions-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer

Additionally, you can disable the publishing on any channel by setting the related property to false as shown in the following example:

kogito.events.usertasks.enabled=false
kogito.events.variables.enabled=false

Managing Event Data

When designing Business Services with event-driven capabilities it is important to evaluate and define how event data is handled:

Security and privacy

If your events and messages transmit sensitive data review recommended methods to secure data and integration with your events platform.

High volume of events

Be prepared to manage high volumes of events in particular for frequently executed processes. to do this you will need to put in place scaling and event buffering mechanisms in the Kafka environment.

Event retention policy

Storing events for long periods of time requires more storage resources which directly impacts the final cost of your system. You can mitigate this by adjusting event retention policies.

Failure strategy

Message consumption can fail for different reasons, such as failure to parse the message payload. Such failures are handled by the Kafka integration code, and can be implemented through different strategies. For Spring Boot, the default failure strategy is retry. For Quarkus, the default failure strategy is fail-fast, which is configurable through the following property:

mp.messaging.incoming.<channel name>.failure-strategy=fail-fast|ignore|dead-letter-queue
Note
As the fail-fast strategy results in the application stopping to consume further messages, it is recommended to configure one of the other available strategies when using Event Data in a Quarkus application.

Implementing an Event Listener Lifecycle in the Process Engine

Every time a process or task changes to a different point in its lifecycle, the Process Engine generates an event. You can develop a class that receives and processes such events. This class is called an event listener. The Process Engine passes an event object to this class. The object provides access to related information. For example, if the event is related to a process node, the object provides access to the Process Instance and the node instance. The Process Engine provides a default implementation of the event listener interface. You can use this implementation as a starting point for your own implementation via the following two interfaces: ProcessEventListener and UserTaskEventListener.

The Process Event Listener Interface

You can develop a class that implements the ProcessEventListener interface. This class listens to process lifecycle events, such as starting or completing a process or entering and leaving a node or changing a variable.

The following code shows the different methods of the ProcessEventListener which you can implement to process the corresponding event:

package org.kie.api.event.process;

import java.util.EventListener;

/**
 * A listener for events related to Process Instance execution.
 */
public interface ProcessEventListener extends EventListener {

    /**
     * This listener method is invoked immediately before a Process Instance is started.
     * @param event
     */
    void beforeProcessStarted(ProcessStartedEvent event);

    /**
     * This listener method is invoked immediately after a Process Instance has been started.
     * @param event
     */
    void afterProcessStarted(ProcessStartedEvent event);

    /**
     * This listener method is invoked immediately before a Process Instance is completed (or aborted).
     * @param event
     */
    void beforeProcessCompleted(ProcessCompletedEvent event);

    /**
     * This listener method is invoked immediately after a Process Instance has been completed (or aborted).
     * @param event
     */
    void afterProcessCompleted(ProcessCompletedEvent event);

    /**
     * This listener method is invoked immediately before a node in a Process Instance is triggered
     * (which is when the node is entered, for example when an incoming connection triggers it).
     * @param event
     */
    void beforeNodeTriggered(ProcessNodeTriggeredEvent event);

    /**
     * This listener method is invoked immediately after a node in a Process Instance has been triggered
     * (which is when the node was entered, for example when an incoming connection triggered it).
     * @param event
     */
    void afterNodeTriggered(ProcessNodeTriggeredEvent event);

    /**
     * This listener method is invoked immediately before a node in a Process Instance is left
     * (which is when the node is completed, for example when it has performed the task it was
     * designed for).
     * @param event
     */
    void beforeNodeLeft(ProcessNodeLeftEvent event);

    /**
     * This listener method is invoked immediately after a node in a Process Instance has been left
     * (which is when the node was completed, for example when it performed the task it was
     * designed for).
     * @param event
     */
    void afterNodeLeft(ProcessNodeLeftEvent event);

    /**
     * This listener method is invoked immediately before the value of a process variable is changed.
     * @param event
     */
    void beforeVariableChanged(ProcessVariableChangedEvent event);

    /**
     * This listener method is invoked immediately after the value of a process variable has been changed.
     * @param event
     */
    void afterVariableChanged(ProcessVariableChangedEvent event);

    /**
     * This listener method is invoked immediately before the SLA of a Process/node Instance is violated.
     * @param event
     */
    default void beforeSLAViolated(SLAViolatedEvent event) {}

    /**
     * This listener method is invoked immediately after the SLA of a Process/node Instance has been violated.
     * @param event
     */
    default void afterSLAViolated(SLAViolatedEvent event) {}

    /**
     * This listener method is invoked when a signal is sent
     * @param event
     */
    default void onSignal(SignalEvent event) {}

    /**
     * This listener is triggered when a migration occurs
     * @param event
     */
    default void onMigration(ProcessMigrationEvent event) { }

    /**
     * This listener method is invoked when a message is sent
     * @param event
     */
    default void onMessage(MessageEvent event) {}

    /**
     * This listener method is invoked when an error is captured
     * @param event
     */
    default void onError (ErrorEvent event) {}

    /**
     * This listener method is invoked immediately after a Process Instance is modified.
     * @param event
     */
    default void onProcessStateChanged(ProcessStateChangeEvent event) {}

    /**
     * This listener method is invoked immediately after a node in a Process Instance is modified.
     * @param event
     */
    default void onNodeStateChanged(ProcessNodeStateChangeEvent event) {}
}

All process events extend the base ProcessEvent interface:

public interface ProcessEvent extends KieRuntimeEvent {
    ProcessInstance getProcessInstance();       // The process instance this event relates to
    Date getEventDate();                        // When the event was created
    default String getEventIdentity() {         // Identity that performed the event
        return null;
    }
}

The methods of the event class can be used to retrieve other classes that contain all information about the entities involved in the event. The following example is a part of a node-related event, such as afterNodeLeft(), and retrieves the Process Instance and node type.

WorkflowProcessInstance processInstance = event.getNodeInstance().getProcessInstance();
NodeType nodeType = event.getNodeInstance().getNode().getNodeType();

When a process variable changes, both the old and new values are available in the event object. The following example shows how to access both values:

@ApplicationScoped
public class MyProcessEventListener implements ProcessEventListener {

    @Override
    public void afterVariableChanged(ProcessVariableChangedEvent event) {
        String variableId = event.getVariableId();
        Object oldValue = event.getOldValue();
        Object newValue = event.getNewValue();

        System.out.println("Variable '" + variableId + "' changed from "
            + oldValue + " to " + newValue);
    }

    // Implement other methods as needed or extend DefaultProcessEventListener
}

Timing of calls to the Process Event Listener

A number of event listener calls are before and after events, for example, beforeNodeLeft() and afterNodeLeft(). The before and after event calls typically act like a stack. If event A directly causes event B, the following sequence of calls happens:

  1. Before A

  2. Before B

  3. After B

  4. After A

For example, if leaving node X triggers node Y, all event calls related to triggering node Y occur between the beforeNodeLeft and afterNodeLeft calls for node X. In the same way, if starting a process directly causes some nodes to start, all nodeTriggered and nodeLeft event calls occur between the beforeProcessStarted and afterProcessStarted calls.

This approach reflects cause and effect relationships between events. However, the timing and order of after event calls are not always intuitive. For example, an afterProcessStarted call can happen after the afterNodeLeft calls for some nodes in the process.

In general, to be notified when a particular event occurs, use the before call for the event. Use an after call only if you want to make sure that all processing related to this event has ended, for example, when you want to be notified when all steps associated with starting a particular Process Instance have been completed.

Depending on the type of node, some nodes might only generate nodeLeft calls and others might only generate nodeTriggered calls. For example, catch intermediate event nodes do not generate nodeTriggered calls because they are not triggered by another process node. Similarly, throw intermediate event nodes do not generate nodeLeft calls because these nodes do not have an outgoing connection to another node.

The User Task Event Listener Interface

You can develop a class that implements the UserTaskEventListener interface. This class can listen to events related to the lifecycle of user tasks, such as assignment of an owner, completion of a task, or changes to task variables. User task event listeners use a single callback method pattern where the event object contains both the old and new states. This design provides access to both the previous state and the current state within a single event handler.

The following source code shows the different methods of the UserTaskEventListener interface which you can implement to process the corresponding event:

package org.kie.kogito.usertask;

import org.kie.kogito.usertask.events.UserTaskAssignmentEvent;
import org.kie.kogito.usertask.events.UserTaskAttachmentEvent;
import org.kie.kogito.usertask.events.UserTaskCommentEvent;
import org.kie.kogito.usertask.events.UserTaskDeadlineEvent;
import org.kie.kogito.usertask.events.UserTaskStateEvent;
import org.kie.kogito.usertask.events.UserTaskVariableEvent;

public interface UserTaskEventListener {

    /**
     * This listener method is invoked when a user task deadline event occurs.
     * @param event
     */
    default void onUserTaskDeadline(UserTaskDeadlineEvent event) {
        // nothing
    }

    /**
     * This listener method is invoked when a user task state changes.
     * The event contains both the old status and new status.
     * @param event
     */
    default void onUserTaskState(UserTaskStateEvent event) {
        // nothing
    }

    /**
     * This listener method is invoked when a user task assignment changes.
     * The event contains both the old users and new users.
     * @param event
     */
    default void onUserTaskAssignment(UserTaskAssignmentEvent event) {
        // nothing
    }

    /**
     * This listener method is invoked when a user task input variable changes.
     * The event contains both the old value and new value.
     * @param event
     */
    default void onUserTaskInputVariable(UserTaskVariableEvent event) {
        // nothing
    }

    /**
     * This listener method is invoked when a user task output variable changes.
     * The event contains both the old value and new value.
     * @param event
     */
    default void onUserTaskOutputVariable(UserTaskVariableEvent event) {
        // nothing
    }

    /**
     * This listener method is invoked when an attachment is added to a user task.
     * @param event
     */
    default void onUserTaskAttachmentAdded(UserTaskAttachmentEvent event) {
        // nothing
    }

    /**
     * This listener method is invoked when an attachment is deleted from a user task.
     * @param event
     */
    default void onUserTaskAttachmentDeleted(UserTaskAttachmentEvent event) {
        // nothing
    }

    /**
     * This listener method is invoked when an attachment is modified in a user task.
     * @param event
     */
    default void onUserTaskAttachmentChange(UserTaskAttachmentEvent event) {
        // nothing
    }

    /**
     * This listener method is invoked when a comment is modified in a user task.
     * @param event
     */
    default void onUserTaskCommentChange(UserTaskCommentEvent event) {
        // nothing
    }

    /**
     * This listener method is invoked when a comment is added to a user task.
     * @param event
     */
    default void onUserTaskCommentAdded(UserTaskCommentEvent event) {
        // nothing
    }

    /**
     * This listener method is invoked when a comment is deleted from a user task.
     * @param event
     */
    default void onUserTaskCommentDeleted(UserTaskCommentEvent event) {
        // nothing
    }
}

The event objects passed to the listener methods provide access to both old and new values as as seen in the following code.

UserTaskStateEvent
public interface UserTaskStateEvent extends UserTaskEvent {
    UserTaskState getNewStatus();  // Current state
    UserTaskState getOldStatus();  // Previous state
}
UserTaskAssignmentEvent
public interface UserTaskAssignmentEvent extends UserTaskEvent {
    String getAssignmentType();
    String[] getNewUsersId();  // New assigned users
    String[] getOldUsersId();  // Previously assigned users
}
UserTaskVariableEvent
public interface UserTaskVariableEvent extends UserTaskEvent {
    enum VariableEventType {
        INPUT,
        OUTPUT
    }

    String getVariableName();
    Object getOldValue();  // Previous value
    Object getNewValue();  // Current value
    VariableEventType getVariableType();
}
UserTaskEvent (base interface)
public interface UserTaskEvent {
    UserTask getUserTask();
    UserTaskInstance getUserTaskInstance();
    Date getEventDate();
    String getEventUser();
}

The following shows how you can access old and new values in User Task events:

@ApplicationScoped
public class MyUserTaskEventListener implements UserTaskEventListener {

    @Override
    public void onUserTaskState(UserTaskStateEvent event) {
        String taskName = event.getUserTask().getTaskName();
        UserTaskState oldStatus = event.getOldStatus();
        UserTaskState newStatus = event.getNewStatus();

        System.out.println("Task '" + taskName + "' changed from "
            + (oldStatus != null ? oldStatus.getName() : "null")
            + " to " + newStatus.getName());
    }

    @Override
    public void onUserTaskInputVariable(UserTaskVariableEvent event) {
        String variableName = event.getVariableName();
        Object oldValue = event.getOldValue();
        Object newValue = event.getNewValue();

        System.out.println("Task input variable '" + variableName + "' changed from "
            + oldValue + " to " + newValue);
    }

    @Override
    public void onUserTaskAssignment(UserTaskAssignmentEvent event) {
        String[] oldUsers = event.getOldUsersId();
        String[] newUsers = event.getNewUsersId();
        String taskName = event.getUserTask().getTaskName();

        System.out.println("Task '" + taskName + "' assignment changed from "
            + (oldUsers != null ? String.join(",", oldUsers) : "null") + " to "
            + (newUsers != null ? String.join(",", newUsers) : "null"));
    }
}

Because UserTaskEventListener uses single callback methods where the event object contains both old and new values there is no notion of timing of calls in this listener.

Registration of event listeners

Event listeners are registered using the Quarkus Contexts and Dependency Injection (CDI). The runtime automatically discovers and registers any classes that implement the ProcessEventListener or UserTaskEventListener interfaces, and that are annotated with an appropriate CDI scope annotation.

CDI-based registration works with both Quarkus and Spring Boot runtimes. For Spring Boot, ensure that your listener classes are within the component scan path.

To register an event listener, create a class that implements the desired listener interface and annotate it with @ApplicationScoped to ensure a single instance of your listener is shared across the application.

The runtime automatically discovers all beans that implement the listener interface, and your listener is invoked when events occur. No additional configuration is required.

The following example shows how to register a ProcessEventListener:

package org.acme;

import org.kie.api.event.process.ProcessCompletedEvent;
import org.kie.api.event.process.ProcessNodeLeftEvent;
import org.kie.api.event.process.ProcessNodeTriggeredEvent;
import org.kie.api.event.process.ProcessStartedEvent;
import org.kie.api.event.process.ProcessVariableChangedEvent;
import org.kie.kogito.internal.process.event.DefaultKogitoProcessEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MyProcessEventListener extends DefaultKogitoProcessEventListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyProcessEventListener.class);

    @Override
    public void beforeProcessStarted(ProcessStartedEvent event) {
        LOGGER.info("beforeProcessStarted: processId={}",
                event.getProcessInstance().getProcessId());
    }

    @Override
    public void afterProcessStarted(ProcessStartedEvent event) {
        LOGGER.info("afterProcessStarted: processId={}, state={}",
                event.getProcessInstance().getProcessId(),
                event.getProcessInstance().getState());
    }

    @Override
    public void beforeProcessCompleted(ProcessCompletedEvent event) {
        LOGGER.info("beforeProcessCompleted: processId={}",
                event.getProcessInstance().getProcessId());
    }

    @Override
    public void afterProcessCompleted(ProcessCompletedEvent event) {
        LOGGER.info("afterProcessCompleted: processId={}, state={}",
                event.getProcessInstance().getProcessId(),
                event.getProcessInstance().getState());
    }

    @Override
    public void beforeNodeTriggered(ProcessNodeTriggeredEvent event) {
        LOGGER.info("beforeNodeTriggered: nodeName={}",
                event.getNodeInstance().getNodeName());
    }

    @Override
    public void afterNodeTriggered(ProcessNodeTriggeredEvent event) {
        LOGGER.info("afterNodeTriggered: nodeName={}",
                event.getNodeInstance().getNodeName());
    }

    @Override
    public void beforeNodeLeft(ProcessNodeLeftEvent event) {
        LOGGER.info("beforeNodeLeft: nodeName={}",
                event.getNodeInstance().getNodeName());
    }

    @Override
    public void afterNodeLeft(ProcessNodeLeftEvent event) {
        LOGGER.info("afterNodeLeft: nodeName={}",
                event.getNodeInstance().getNodeName());
    }

    @Override
    public void beforeVariableChanged(ProcessVariableChangedEvent event) {
        LOGGER.info("beforeVariableChanged: variableId={}, OLD_VALUE={}, NEW_VALUE={}",
                event.getVariableId(),
                event.getOldValue(),
                event.getNewValue());
    }

    @Override
    public void afterVariableChanged(ProcessVariableChangedEvent event) {
        LOGGER.info("afterVariableChanged: variableId={}, OLD_VALUE={}, NEW_VALUE={}",
                event.getVariableId(),
                event.getOldValue(),
                event.getNewValue());
    }
}

Extending DefaultKogitoProcessEventListener is recommended for ProcessEventListener implementations. This base class provides empty default implementations for all methods, enabling you to override only the methods you need.

The following example shows how to register an UserTaskEventListener:

package org.acme;

import org.kie.kogito.usertask.UserTaskEventListener;
import org.kie.kogito.usertask.events.UserTaskAssignmentEvent;
import org.kie.kogito.usertask.events.UserTaskStateEvent;
import org.kie.kogito.usertask.events.UserTaskVariableEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MyUserTaskEventListener implements UserTaskEventListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyUserTaskEventListener.class);

    @Override
    public void onUserTaskState(UserTaskStateEvent event) {
        LOGGER.info("onUserTaskState: taskName={}, OLD_STATUS={}, NEW_STATUS={}",
                event.getUserTaskInstance().getTaskName(),
                event.getOldStatus(),
                event.getNewStatus());
    }

    @Override
    public void onUserTaskAssignment(UserTaskAssignmentEvent event) {
        LOGGER.info("onUserTaskAssignment: taskName={}, OLD_USERS={}, NEW_USERS={}",
                event.getUserTaskInstance().getTaskName(),
                event.getOldUsersId() != null ? String.join(",", event.getOldUsersId()) : "null",
                event.getNewUsersId() != null ? String.join(",", event.getNewUsersId()) : "null");
    }

    @Override
    public void onUserTaskInputVariable(UserTaskVariableEvent event) {
        LOGGER.info("onUserTaskInputVariable: variableName={}, OLD_VALUE={}, NEW_VALUE={}",
                event.getVariableName(),
                event.getOldValue(),
                event.getNewValue());
    }

    @Override
    public void onUserTaskOutputVariable(UserTaskVariableEvent event) {
        LOGGER.info("onUserTaskOutputVariable: variableName={}, OLD_VALUE={}, NEW_VALUE={}",
                event.getVariableName(),
                event.getOldValue(),
                event.getNewValue());
    }
}

Unlike ProcessEventListener, the UserTaskEventListener interface provides default empty implementations for all methods, so you can implement the interface directly without needing a base class.

For a complete working example demonstrating event listener registration and usage, see the event-listeners-quarkus-example in the BAMOE examples repository Getting Started Example Business Service projects.

This example includes:

  • TestProcessEventListener.java - A complete ProcessEventListener implementation

  • TestUserTaskEventListener.java - A complete UserTaskEventListener implementation

  • A sample Hiring process that demonstrates all lifecycle events

Accessing Complete Task Data via UserTaskInstance

The UserTaskInstance object, accessible via event.getUserTaskInstance(), contains all task data in a single object. This makes it easy to integrate with external systems (e.g., Elasticsearch, Kafka) or build audit trails.

The following example shows how to capture complete task data:

@ApplicationScoped
public class ExternalSystemUserTaskListener implements UserTaskEventListener {

    @Override
    public void onUserTaskState(UserTaskStateEvent event) {
        UserTaskInstance task = event.getUserTaskInstance();

        // All data available in one object:
        task.getId();                // Task identifier
        task.getTaskName();          // Task name
        task.getStatus().getName();  // Current status
        task.getActualOwner();       // Assigned user
        task.getPotentialUsers();    // Potential owners (users)
        task.getPotentialGroups();   // Potential owners (groups)
        task.getExcludedUsers();     // Excluded users
        task.getAdminUsers();        // Business administrators (users)
        task.getAdminGroups();       // Business administrators (groups)
        task.getInputs();            // Input variables (Map)
        task.getOutputs();           // Output variables (Map)
        task.getAttachments();       // Task attachments
        task.getComments();          // Task comments
        task.getMetadata();          // Task metadata
        task.getSlaDueDate();        // SLA due date

        // Process context
        ProcessInfo info = task.getProcessInfo();
        info.getProcessInstanceId();
        info.getProcessId();
        info.getProcessVersion();

        // State transition info from event
        event.getOldStatus();        // Previous state
        event.getNewStatus();        // Current state
        event.getEventDate();        // When it happened
        event.getEventUser();        // Who triggered it
    }
}

Alternatively you can serialize the entire UserTaskInstance to JSON with ObjectMapper, to send to external systems as shown below:

@ApplicationScoped
public class JsonExportUserTaskListener implements UserTaskEventListener {

    @Inject
    ObjectMapper mapper;

    @Override
    public void onUserTaskState(UserTaskStateEvent event) {
        try {
            UserTaskInstance task = event.getUserTaskInstance();
            String json = mapper.writeValueAsString(task);
            LOGGER.info("Task JSON: {}", json);
            // Send to Elasticsearch, Kafka, etc.
        } catch (JsonProcessingException e) {
            LOGGER.error("Error serializing task", e);
        }
    }
}

Which results in the following JSON output:

{
  {
  "id": "71a23b77-98ee-4a57-ad6f-f532b39630c7",
  "userTaskId": "_B8C4F63C-81AD-4291-9C1B-84967277EEF6",
  "processInfo": {
    "processInstanceId": "18fc537b-567a-4904-9a26-1094f4e29822",
    "processId": "hiring",
    "processVersion": "1.0"
  },
  "status": { "terminate": null, "name": "Ready" },
  "actualOwner": "jdoe",
  "taskName": "HRInterview",
  "potentialUsers": ["jdoe"],
  "potentialGroups": ["hr-manager"],
  "adminUsers": [],
  "adminGroups": [],
  "excludedUsers": [],
  "inputs": {
    "offer": { "category": "Senior Software Engineer", "salary": 40450 },
    "candidate": { "name": "Jon", "lastName": "Snow", "email": "jon@snow.org", "experience": 5, "skills": ["Java", "Kogito", "Fencing"] },
    "approve": false
  },
  "outputs": {},
  "attachments": [],
  "comments": [],
  "metadata": { "ProcessType": "BPMN", "Lifecycle": "kogito", "Skippable": "false" }
}

The following table lists all the fields available in UserTaskInstance:

Category Fields

Task Identity

id, taskName, taskDescription, taskPriority

Assignments

potentialUsers, potentialGroups, excludedUsers, adminUsers, adminGroups, actualOwner

Data

inputs (Map), outputs (Map)

State

status (name + terminate)

Process Context

processInfo.processId, processInfo.processInstanceId, processInfo.processVersion, processInfo.rootProcessId, processInfo.rootProcessInstanceId

Metadata

metadata (ProcessType, Lifecycle, Skippable, NodeInstanceId, ProcessInstanceState)

Timestamps

slaDueDate, eventDate (from event)

Social

attachments, comments

Best Practices for EventListener Development

The Process Engine calls event listeners during the processing of events or tasks. The calls happen within Process Engine transactions and block execution. The following guidelines will help you develop event listeners that are reliable and efficient:

  1. Keep actions short

    Event listeners execute synchronously within the transaction. Long-running operations block process execution.

  2. Do not rely on listener execution order

    Multiple listeners may be registered and their execution order is not guaranteed.

  3. Avoid external calls (REST, messaging) inside listeners

    Use process nodes for such calls instead. An exception is logging, which should be kept as simple as possible.

  4. You can modify process/task state.

    A listener can change variables of the process or task involved in the event, or interact with the Process Engine (e.g., send signals).

  5. Handle exceptions gracefully, always wrapping external calls in try-catch blocks

    Note
    Event listeners execute within the same transaction as the task or process operation. If your listener throws an uncaught exception, it will roll back the entire transaction — including the user task completion or process state change.

The following examples show how to handle exceptions in User Task event listeners:

@Override
public void onUserTaskState(UserTaskStateEvent event) {
    try {
        // External service call
        externalService.notify(event.getUserTaskInstance());
    } catch (Exception e) {
        LOGGER.error("Error in listener: {}", e.getMessage(), e);
        // Don't rethrow — allow the transaction to complete
    }
}

And in Process Event Listeners:

@Override
public void afterVariableChanged(ProcessVariableChangedEvent event) {
    try {
        auditService.recordChange(event);
    } catch (Exception e) {
        LOGGER.error("Error in listener: {}", e.getMessage(), e);
    }
}

Publishing engine events to an external read model

The ProcessEventListener and UserTaskEventListener interfaces described above run inside the engine transaction. They are the right choice for in-process reactions such as logging, validation, or sending a signal back to the engine.

When the goal is to forward the same engine events to an external read model - for example an Elasticsearch index that powers a custom dashboard or that an internal consumer queries directly - use the org.kie.kogito.event.EventPublisher interface. The engine batches the events produced during each command and delivers the batch to every registered publisher when the command completes successfully. If the command throws an exception, the unit of work is aborted, the batch is discarded, and the publisher is not invoked, so a publisher never sees events from a failed engine command.

The Kafka-based Process event add-on described earlier in this page is itself a registered EventPublisher. A custom publisher coexists with it transparently - both fire on every committed batch, and each writes to its own destination.

The EventPublisher interface

The interface is published in the package org.kie.kogito.event and declares two methods:

public interface EventPublisher {
    void publish(DataEvent<?> event);
    void publish(Collection<DataEvent<?>> events);
}

Implement both methods. Which one carries the work depends on the style you want:

  • Granular style. Write one downstream record per engine event. Put your logic in publish(DataEvent<?>); from publish(Collection<DataEvent<?>>) loop over the batch and call publish(DataEvent<?>) for each event. Use this style when each engine event maps directly to a row or document in your destination.

  • Aggregated style. Group multiple events from the same engine commit into one downstream record. Put your logic in publish(Collection<DataEvent<?>>). Use this style when you want, for example, one rich document per User Task transition that combines state, variable, and assignment events from the same commit.

Auto-registration

A class that implements EventPublisher and is marked as a CDI bean (Quarkus) or a Spring bean (Spring Boot) is discovered and registered by the engine automatically. No manual wiring:

package org.acme;

import java.util.Collection;

import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped // Quarkus. Use @Component on Spring Boot.
public class MyExternalPublisher implements EventPublisher {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyExternalPublisher.class);

    @Override
    public void publish(DataEvent<?> event) {
        // per-event logic - write one record to the external destination
        LOGGER.info("Engine event: type={}, data={}", event.getType(), event.getData());
    }

    @Override
    public void publish(Collection<DataEvent<?>> events) {
        // per-batch logic - granular publishers simply loop, aggregated publishers merge
        events.forEach(this::publish);
    }
}

Multiple publishers can coexist, every registered EventPublisher receives every committed batch.

Event types delivered to a publisher

The same data event types that the Process event add-on publishes to Kafka are delivered to a custom publisher. The useful ones for an external read model for example are:

Event type Sent when

ProcessInstanceStateDataEvent

A Process Instance changes lifecycle state.

ProcessInstanceVariableDataEvent

A Process Instance variable is set or changed.

UserTaskInstanceStateDataEvent

A User Task changes state (Created, Ready, Reserved, Completed, and so on).

UserTaskInstanceAssignmentDataEvent

The set of users or groups assigned to a User Task changes.

UserTaskInstanceVariableDataEvent

A User Task input or output variable is set or changed.

Each event exposes getType(), which returns the event type name as a string, and getData(), which returns the typed event body — for example ProcessInstanceStateEventBody or UserTaskInstanceStateEventBody. An implementation typically switches on getType() to dispatch to per-type logic.

The DataEvent objects passed to a publisher are the same in-memory representations the Kafka add-on consumes. The Kafka add-on serializes them to the CloudEvents JSON format before publishing; a custom publisher is free to work with the typed event objects directly, or to apply its own serialization.

Filtering non-transition state events

During User Task initialization, the engine emits several UserTaskInstanceStateDataEvent instances where the old and new state are the same (for example, CreatedCreated). The engine labels these non-transition events with eventType = "Modify" on the event body, so a publisher that is interested only in real lifecycle transitions can skip them with a single check:

UserTaskInstanceStateEventBody body = ((UserTaskInstanceStateDataEvent) event).getData();
if (!"Modify".equals(body.getEventType())) {
    // real transition: Ready, Reserved, Completed, and so on
    writeToDestination(body);
}

The aggregated reference publisher in the example uses the same filter after merging the batch by userTaskInstanceId.

Reference example

For complete working examples demonstrating EventPublisher implementations, see the process-event-listeners-quarkus and process-event-listeners-springboot examples in the BAMOE examples repository Getting Started Example Business Service projects.

These examples include:

  • ElasticsearchEventPublisher.java - a granular per-event publisher implementation

  • HumanTaskChangedPublisher.java - an aggregated per-transition publisher implementation

  • A sample Hiring process that demonstrates both publishers writing to Elasticsearch