Custom operations

You can customize event processing through custom operations by defining specific JSLT scripts in order to perform operations on correlated events and produce an intermediary event called a summary. Summaries can be transmitted to egresses and monitored as well.

About this task

When a new business event is received:

  • the summary is retrieved from the state database by finding the summary whose field correlationId matches with the event correlation path value.
  • in the aggregation transformation, the summary is enriched with the data from the current event.
  • the enriched summary is stored in the state database.

The summary completes when the correlated input event matches completion condition specified in the custom operation definition.

Within the custom operation, it is also possible to apply filtering using in/out selectors. inSelector filters input events. On the other hand, outSelector determines whether to send aggregated outputs to egresses or not.

The following table lists all the variables available in the operation transformation JSLT script. The values are automatically set by the runtime and are accessible using syntax $<Variable>.

Table 1. Variables available in JSLT script
Variable Description
name The operation name
businessEvent The current event
summary The summary stored in the state
correlationId The correlation identifier of an execution context
isBusinessEventMoreRecent A boolean value indicating if the current event is more recent than the summary stored in the state
timestamp The timestamp of the current event
timestampMillis The timestamp of the current event in milliseconds

Procedure

  1. Create a folder named test-app containing following files:
    • processing-conf.json file defining Kafka ingress, a context with custom operation and Kafka ingress.

      {
        "ingresses": [
          {
            "uid": "I0001",
            "type": "kafka",
            "topic": "icp4ba-bai-orders"
          }
        ],
        "contexts": [
          {
            "name": "orders",
            "correlationPath": {
              "type": "jslt-inline",
              "expression": ".id"
            },
            "operations": [
              {
                "uid": "C0001",
                "type": "custom",
                "name": "order-process",
                "conditions": {
                  "completion": {
                    "type": "jslt-inline",
                    "expression": "test(.status, \"ORDER_DONE\")"
                  },
                  "aggregationCompletion": {
                    "type": "jslt-inline",
                    "expression": "test(.status, \"ORDER_DONE\") and (.startTimestamp != null)"
                  },
                  "delay": 1000
                },
                "aggregation": {
                  "type": "jslt-file",
                  "filename": "summary-order.jslt"
                },
                "egressRefs": ["opensearch-orders", "kafka-orders"]
              }
            ]
          }
        ],
        "egresses": [
          {
            "uid": "E0001",
            "type": "opensearch",
            "name": "opensearch-orders",
            "index": {
              "name": "icp4ba-bai-orders",
              "uncompletedOperationsIndex": {
                "name": "icp4ba-bai-orders-in-progress"
              },
              "docId": {
                "type": "jslt-inline",
                "expression": ".correlationId"
              }
            }
          },
          {
            "uid": "E0002",
            "type": "kafka",
            "name": "kafka-orders",
            "topic": "icp4ba-bai-orders-kafka",
            "enabled": true
          }
        ],
        "settings": {
          "verboseLogs": true
        }
      }
    • summary-order.jslt file defining the computation applied to correlated events to generate the summaries.

      let iso8601Format = "yyyy-MM-dd'T'HH:mm:ss"
      def start-time()
        if (test($businessEvent.status, "ORDER_CREATED"))
          { "startTimestamp": $timestamp }
        else if ($summary.startTimestamp != null)
          { "startTimestamp": $summary.startTimestamp }
        else
          {}
      def end-time()
        if (test($businessEvent.status, "ORDER_DONE"))
          { "endTimestamp": $timestamp }
        else if ($summary.endTimestamp != null)
          { "endTimestamp": $summary.endTimestamp }
        else
          {}
      def duration()
        let startTime = get-key(start-time(), "startTimestamp", null)
        let endTime = get-key(end-time(), "endTimestamp", null)
        if ($startTime != null and $endTime != null)
          { "duration": floor(parse-time($endTime, $iso8601Format) - parse-time($startTime, $iso8601Format)) }
        else
          {}
      def state()
        if ($isBusinessEventMoreRecent)
          { "state": $businessEvent.status }
        else
          { "state": $summary.state }
      def timestamp()
        if ($isBusinessEventMoreRecent)
          { "timestamp": $timestamp }
        else
          { "timestamp": $summary.timestamp }
      def previous-data()
        if ($summary.data != null)
          $summary.data
        else
          {}
      def current-data()
        { for ($businessEvent.data) .key: .value }
      def aggregate-data()
       if ($isBusinessEventMoreRecent)
         { "data" : current-data() + previous-data() }
       else
         { "data" : previous-data() + current-data() }
      aggregate-data() + // data
      duration() + // duration
      end-time() +  // endTimestamp
      start-time() +  //startTimestamp
      state() +
      timestamp() +
      {
        "operationName": $name,
        "correlationId": $correlationId
      }
  2. Create a folder named test-data containing the following 3 events:
    • event-1-order-created.json:
      {
        "id": "order_1",
        "kind": "order",
        "seq": 1,
        "timestamp": "2022-04-01T00:22:12.000-04:00",
        "status": "ORDER_CREATED"
      }
    • event-2-order-processed.json:
      {
        "id": "order_1",
        "kind": "order",
        "seq": 2,
        "timestamp": "2022-04-03T00:13:31.000-04:00",
        "status": "ORDER_PROCESSED",
        "data": {
          "shipped": true
        }
      }
    • event-3-order-done.json:
      {
        "id": "order_1",
        "kind": "order",
        "seq": 3,
        "timestamp": "2022-04-07T00:05:42.000-04:00",
        "status": "ORDER_DONE",
        "data": {
          "paid": true
        }
      }
  3. Create a Processing Application.
    management-cli processing-app create --name="test-app"
  4. Import the configuration.
    management-cli processing-conf import --name="test-app" --directory="./test-app"
  5. Deploy the application.
    management-cli processing-app deploy --name="test-app"
  6. Use the events processed by the application to check the result of the processing. Run the following command in a dedicated terminal:
    management-cli kafka consumer-json --topic=icp4ba-bai-orders-kafka

    The following message appears: Retrieving events from topic icp4ba-bai-orders-kafka.

    Events output to the egress topic is displayed in this terminal.

  7. Send the first event.
    management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-1-order-created.json
    When the first event event-1-order-created.json is processed, the startTimestamp field is created, and assigned the value of this event timestamp:
    Event read (key=null partition=0, offset=138/139): {"operationName":"order-process","correlationId":"order_1","timestamp":"2022-04-01T00:22:12.000-04:00","state":"ORDER_CREATED","startTimestamp":"2022-04-01T00:22:12.000-04:00","data":{}}`
  8. Send the second event.
    management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-2-order-processed.json 
    When the second event is processed, the data it contains is added to the summary, and the event state present in the summary is updated:
    Event read (key=null partition=0, offset=139/140): {"operationName":"order-process","correlationId":"order_1","timestamp":"2022-04-03T00:13:31.000-04:00","state":"ORDER_PROCESSED","startTimestamp":"2022-04-01T00:22:12.000-04:00","data":{"shipped":true}}
  9. Send the last event.
    management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-3-order-done.json

    The last event data is added into the summary, the endTimestamp field is assigned the value of it's timestamp and the duration is computed as the difference between the startTimestamp and the endTimestamp:

    Event read (key=null partition=0, offset=140/141): {"operationName":"order-process","correlationId":"order_1","timestamp":"2022-04-07T00:05:42.000-04:00","state":"ORDER_DONE","startTimestamp":"2022-04-01T00:22:12.000-04:00","endTimestamp":"2022-04-07T00:05:42.000-04:00","duration":517410,"data":{"shipped":true,"paid":true}}

What to do next

See Cleaning up the Processing Application to clean all the resources after a deployment.