Managing events order within operations

This section describes how to use the delay property to handle events order when they processed.

About this task

When processing a stream of events, it is not guaranteed that the order in which events arrive at the processing step is the same as the order in which the events occurred.

Consequently, this may cause problems regarding the completeness of the aggregation operations.

For instance, if you have an aggregation that requires receiving several input events before the end condition, the operation may end prematurely if the end event arrives earlier than expected. In this case, the aggregation will provide an incomplete result.

One solution to overcome this problem is to use a delay property, defined as the delay in milliseconds that the processing application must respect after the completion condition becomes true, before completing the operation and considering the summary as completed. A good value for the delay would be the upper bound of the events latency.

Procedure

  1. Create a folder named test-app containing the following files
    processing-conf.json - file defining Kafka ingress, a context with custom operation and a Kafka egress.
    {
      "ingresses": [
        {
          "uid": "I0001",
          "type": "kafka",
          "topic": "icp4ba-bai-orders"
        }
      ],
      "contexts": [
        {
          "name": "orders",
          "correlationPath": {
            "type": "jslt-inline",
            "expression": ".id"
          },
          "operations": [
            {
              "uid": "C0001",
              "type": "custom",
              "name": "order-process",
              "conditions": {
                "completion": {
                  "type": "jslt-inline",
                  "expression": "test(.status, \"ORDER_DONE\")"
                },
                "aggregationCompletion": {
                  "type": "jslt-inline",
                  "expression": "test(.status, \"ORDER_DONE\") and (.startTimestamp != null)"
                },
                "delay": 10000
              },
              "aggregation": {
                "type": "jslt-file",
                "filename": "summary-order.jslt"
              },
              "egressRefs": [ "kafka-orders"]
            }
          ]
        }
      ],
      "egresses": [
        {
          "uid": "E0002",
          "type": "kafka",
          "name": "kafka-orders",
          "topic": "icp4ba-bai-orders-kafka",
          "enabled": true
        }
      ],
      "settings": {
        "verboseLogs": true
      }
    }
  2. Add a summary-order.jslt file to define the computation applied to correlated events to generate the summaries.
    let iso8601Format = "yyyy-MM-dd'T'HH:mm:ss"
    def start-time()
      if (test($businessEvent.status, "ORDER_CREATED"))
        { "startTimestamp": $timestamp }
      else if ($summary.startTimestamp != null)
        { "startTimestamp": $summary.startTimestamp }
      else
        {}
    def end-time()
      if (test($businessEvent.status, "ORDER_DONE"))
        { "endTimestamp": $timestamp }
      else if ($summary.endTimestamp != null)
        { "endTimestamp": $summary.endTimestamp }
      else
        {}
    def duration()
      let startTime = get-key(start-time(), "startTimestamp", null)
      let endTime = get-key(end-time(), "endTimestamp", null)
      if ($startTime != null and $endTime != null)
        { "duration": floor(parse-time($endTime, $iso8601Format) - parse-time($startTime, $iso8601Format)) }
      else
        {}
    def state()
      if ($isBusinessEventMoreRecent)
        { "state": $businessEvent.status }
      else
        { "state": $summary.state }
    def timestamp()
      if ($isBusinessEventMoreRecent)
        { "timestamp": $timestamp }
      else
        { "timestamp": $summary.timestamp }
    def previous-data()
      if ($summary.data != null)
        $summary.data
      else
        {}
    def current-data()
      { for ($businessEvent.data) .key: .value }
    def aggregate-data()
     if ($isBusinessEventMoreRecent)
       { "data" : current-data() + previous-data() }
     else
       { "data" : previous-data() + current-data() }
    aggregate-data() + // data
    duration() + // duration
    end-time() +  // endTimestamp
    start-time() +  //startTimestamp
    state() +
    timestamp() +
    {
      "operationName": $name,
      "correlationId": $correlationId
    }
  3. Create a folder named test-data containing the following three events:
    • event-1-order-created.json:
      {"id": "order_5", "kind": "order", "timestamp": "2022-04-01T00:22:12.000-04:00", "status": "ORDER_CREATED"}
    • event-2-order-processed.json
      {"id": "order_5", "kind": "order", "timestamp": "2022-04-03T00:13:31.000-04:00", "status": "ORDER_PROCESSED", "data": {"shipped": true}}
    • event-3-order-done.json
      {"id": "order_5", "kind": "order", "timestamp": "2022-04-07T00:05:42.000-04:00", "status": "ORDER_DONE", "data": {"paid": true}}
  4. Create a processing application.
    management-cli processing-app create --name="test-app"
  5. Import the configuration.
    management-cli processing-conf import --name="test-app" --directory="./test-app"
  6. Deploy the application.
    management-cli processing-app deploy --name="test-app"
  7. Use the events processed by the application to check the result of the processing. Run the following command in a dedicated terminal:
    management-cli kafka consumer-json --topic=icp4ba-bai-orders-kafka

    The following message appears: Retrieving events from topic icp4ba-bai-orders-kafka.

    Events output to the egress topic is displayed in this terminal.

  8. Send unordered events within the delay time of 10 seconds.
    management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-1-order-created.json \ && management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-3-order-done.json \ && management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-2-order-processed.json
    In the Kafka consumer terminal you opened, you should observe that when sending unordered events within the delay, the result is:
    Event read (key=null partition=0, offset=10/11): {"operationName":"order-process","correlationId":"order_5","timestamp":"2022-04-01T00:22:12.000-04:00","state":"ORDER_CREATED","startTimestamp":"2022-04-01T00:22:12.000-04:00","data":{}}
    Event read (key=null partition=0, offset=11/12): {"operationName":"order-process","correlationId":"order_5","timestamp":"2022-04-07T00:05:42.000-04:00","state":"ORDER_DONE","startTimestamp":"2022-04-01T00:22:12.000-04:00","endTimestamp":"2022-04-07T00:05:42.000-04:00","duration":517410,"data":{"paid":true}}
    Event read (key=null partition=0, offset=12/13): {"operationName":"order-process","correlationId":"order_5","timestamp":"2022-04-07T00:05:42.000-04:00","state":"ORDER_DONE","startTimestamp":"2022-04-01T00:22:12.000-04:00","endTimestamp":"2022-04-07T00:05:42.000-04:00","duration":517410,"data":{"shipped":true,"paid":true}}
    This is because although event 3 arrived before event 2, the processing application waited for the delay time before ending the operation.
  9. Send unordered events in a time that exceeds the delay time.
    management-cli kafka producer-json --topic=icp4ba-bai-orders --file=data/event-1-order-created.json \
    && management-cli kafka producer-json --topic=icp4ba-bai-orders --file=data/event-3-order-done.json \
    && sleep 11 \
    && management-cli kafka producer-json --topic=icp4ba-bai-orders --file=data/event-2-order-processed.json

    However, when you send event 2 after a time that exceeds the delay, the output will be:

    Event read (key=null partition=0, offset=13/14): {"operationName":"order-process","correlationId":"order_5","timestamp":"2022-04-01T00:22:12.000-04:00","state":"ORDER_CREATED","startTimestamp":"2022-04-01T00:22:12.000-04:00","data":{}}
    Event read (key=null partition=0, offset=14/15): {"operationName":"order-process","correlationId":"order_5","timestamp":"2022-04-07T00:05:42.000-04:00","state":"ORDER_DONE","startTimestamp":"2022-04-01T00:22:12.000-04:00","endTimestamp":"2022-04-07T00:05:42.000-04:00","duration":517410,"data":{"paid":true}}
    Event read (key=null partition=0, offset=15/16): {"operationName":"order-process","correlationId":"order_5","timestamp":"2022-04-03T00:13:31.000-04:00","state":"ORDER_PROCESSED","data":{"shipped":true}}
    Because the aggregation ended before event 2 was sent, event 2 is considered as the first event in a new summary.

What to do next

See Cleaning up the Processing Application to clean all the resources after a deployment.