Handling uncompleted operations

The summary remains in an intermediary state until operations on a context are completed. In some cases, you may want to monitor these intermediary results.

About this task

This procedure describes how to read an uncompleted list of correlated events from a Kafka ingress topic, write the uncompleted summary to an OpenSearch egress index, visualize the uncompleted summary in Business Performance Center, then read a last event that completes the sequence from the Kafka ingress topic, visualize the results in Business Performance Center and see that the summary is removed from the uncompleted OpenSearch index.

Procedure

  1. Create a folder named test-app containing the following files:
    • processing-conf.json - file defining a Kafka ingress, a context with custom operation, a Kafka egress and an OpenSearch egress.
      {
          "ingresses": [
            {
              "uid": "I0001",
              "type": "kafka",
              "topic": "icp4ba-bai-orders"
            }
          ],
          "contexts": [
            {
              "name": "orders",
              "correlationPath": {
                "type": "jslt-inline",
                "expression": ".id"
              },
              "operations": [
                {
                  "uid": "C0001",
                  "type": "custom",
                  "name": "intermediary-steps",
                  "conditions": {
                    "completion": {
                      "type": "jslt-inline",
                      "expression": "test(.status, \"ORDER_DONE\")"
                    },
                    "aggregationCompletion": {
                      "type": "jslt-inline",
                      "expression": "test(.status, \"ORDER_DONE\") and .numberOfSteps != 0"
                    },
                    "delay": 1000
                  },
                  "aggregation": {
                    "type": "jslt-file",
                    "filename": "computation.jslt"
                  },
                  "inSelector": {
                    "type": "jslt-inline",
                    "expression": "test(.kind, \"order\")"
                  },
                  "outSelector": {
                    "type": "jslt-inline",
                    "expression": ".numberOfSteps != 0"
                  },
                  "egressRefs": ["opensearch-computation", "kafka-computation"]
                }
              ]
            }
          ],
          "egresses": [
            {
              "uid": "E0001",
              "type": "opensearch",
              "name": "opensearch-computation",
              "index": {
                "name": "icp4ba-bai-computation-egress",
                "uncompletedOperationsIndex": {
                  "name": "icp4ba-bai-computation-in-progress-egress"
                },
                "docId": {
                  "type": "jslt-inline",
                  "expression": ".correlationId"
                },
                "mapping": {
                  "filename": "opensearch-mapping.json"
                }
              }
            },
            {
              "uid": "E0010",
              "type": "kafka",
              "name": "kafka-computation",
              "topic": "icp4ba-bai-computation",
              "enabled": true
            }
          ],
          "settings": {
            "verboseLogs": true
          }
        }
    • computation.jslt - file defining the computation applied to correlated events to generate the summaries

      def increment()
          if (test($businessEvent.status, "ORDER_CREATED"))
              1
          else if ($summary.numberOfSteps != null)
              $summary.numberOfSteps+1
          else
               0
      
      def status()
        if ($isBusinessEventMoreRecent)
          { "status": $businessEvent.status }
        else
          { "status": $summary.status }
      
      status()
      +
      {
          "numberOfSteps" : increment(),
          "previousSummaryVal" : $summary.numberOfSteps,
          "correlationId" : $correlationId
      }
    • opensearch-mapping.json - file defining the OpenSearch mapping that is going to be created for the OpenSearch indices

      {
          "dynamic": true,
          "properties": {
            "id": {
              "type": "keyword"
            },
            "kind": {
              "type": "keyword"
            },
            "status": {
              "type": "keyword"
            },
            "timestamp": {
              "type": "date"
            }
          }
        }
    • monitoring-sources.json - file defining the monitoring sources for each OpenSearch index so they can be visualized in Business Performance Center.

      [
          {
            "id": "computation-in-progress-egress",
            "elasticsearchIndex": "icp4ba-bai-computation-in-progress-egress",
            "fields": [
              {
                  "field": "status"
              }
            ]
          },
          {
            "id": "computation-egress",
            "elasticsearchIndex": "icp4ba-bai-computation-egress",
            "fields": [
              {
                  "field": "status"
              }
            ]
          }
      ]
  2. Create a folder named test-data containing the following 4 events:
    • event-1-created.json
      {"id": "order_1", "kind": "order", "timestamp": "2022-04-07T00:05:42.000-04:00", "status": "ORDER_CREATED"}
    • event-2-processing.json
      {"id": "order_1", "kind": "order", "timestamp": "2022-04-09T00:08:42.000-04:00", "status": "ORDER_PROCESSING"}
    • event-3-verification.json
      {"id": "order_1", "kind": "order", "timestamp": "2022-04-09T00:09:42.000-04:00", "status": "ORDER_VERIFICATION"}
    • event-4-completed.json
      {"id": "order_1", "kind": "order", "timestamp": "2022-04-09T00:10:42.000-04:00", "status": "ORDER_DONE"}
  3. Import the configuration.
    management-cli processing-conf import --name="test-app" --directory="./test-app"
  4. Deploy the application.
    management-cli processing-app deploy --name="test-app"
  5. Use the events processed by the application to check the result of the processing. Run the following command in a dedicated terminal:
    management-cli kafka consumer-json --topic=icp4ba-bai-computation

    The following message appears: Retrieving events from topic icp4ba-bai-order-computation.

    Events output to the egress topic is displayed in this terminal.

  6. Send the first three test events.
    management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-1-created.json 
    management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-2-processing.json 
    management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-3-verification.json
    After sending the first events and before sending the event-4-completed.json, the events are stored in the index defined in uncompletedOperationsIndex. You can monitor this index in Business Performance Center.
  7. Create a dashboard named Test dashboard. In this dashboard, create a chart of type Metric named Computation in progress and select computation-in-progress-egress - ALL monitoring source. Save the chart, you should see the chart displaying a count of 1 which refers to the summary being computed.
  8. Create another Metric chart named Computation and select computation-egress - ALL monitoring source. Save the chart which should also display a count of 1.

    The value is 1 because the icp4ba-bai-computation-egress index specified in the processing configuration is mapped to an OpenSearch alias that points to indices containing completed and uncompleted summaries.

  9. Send the last event.
    management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-4-completed.json
    The Computation in progress chart displays a metric of 0 while the Computation chart displays a metric of 1.

What to do next

See Cleaning up the Processing Application to clean all the resources after a deployment.