Handling uncompleted operations
About this task
This procedure describes how to read an uncompleted list of correlated events from a Kafka ingress topic, write the uncompleted summary to an OpenSearch egress index, visualize the uncompleted summary in Business Performance Center, then read a last event that completes the sequence from the Kafka ingress topic, visualize the results in Business Performance Center and see that the summary is removed from the uncompleted OpenSearch index.
Procedure
-
Create a folder named
test-appcontaining the following files:processing-conf.json- file defining a Kafka ingress, a context with custom operation, a Kafka egress and an OpenSearch egress.{ "ingresses": [ { "uid": "I0001", "type": "kafka", "topic": "icp4ba-bai-orders" } ], "contexts": [ { "name": "orders", "correlationPath": { "type": "jslt-inline", "expression": ".id" }, "operations": [ { "uid": "C0001", "type": "custom", "name": "intermediary-steps", "conditions": { "completion": { "type": "jslt-inline", "expression": "test(.status, \"ORDER_DONE\")" }, "aggregationCompletion": { "type": "jslt-inline", "expression": "test(.status, \"ORDER_DONE\") and .numberOfSteps != 0" }, "delay": 1000 }, "aggregation": { "type": "jslt-file", "filename": "computation.jslt" }, "inSelector": { "type": "jslt-inline", "expression": "test(.kind, \"order\")" }, "outSelector": { "type": "jslt-inline", "expression": ".numberOfSteps != 0" }, "egressRefs": ["opensearch-computation", "kafka-computation"] } ] } ], "egresses": [ { "uid": "E0001", "type": "opensearch", "name": "opensearch-computation", "index": { "name": "icp4ba-bai-computation-egress", "uncompletedOperationsIndex": { "name": "icp4ba-bai-computation-in-progress-egress" }, "docId": { "type": "jslt-inline", "expression": ".correlationId" }, "mapping": { "filename": "opensearch-mapping.json" } } }, { "uid": "E0010", "type": "kafka", "name": "kafka-computation", "topic": "icp4ba-bai-computation", "enabled": true } ], "settings": { "verboseLogs": true } }-
computation.jslt- file defining the computation applied to correlated events to generate the summariesdef increment() if (test($businessEvent.status, "ORDER_CREATED")) 1 else if ($summary.numberOfSteps != null) $summary.numberOfSteps+1 else 0 def status() if ($isBusinessEventMoreRecent) { "status": $businessEvent.status } else { "status": $summary.status } status() + { "numberOfSteps" : increment(), "previousSummaryVal" : $summary.numberOfSteps, "correlationId" : $correlationId } -
opensearch-mapping.json- file defining the OpenSearch mapping that is going to be created for the OpenSearch indices{ "dynamic": true, "properties": { "id": { "type": "keyword" }, "kind": { "type": "keyword" }, "status": { "type": "keyword" }, "timestamp": { "type": "date" } } } -
monitoring-sources.json- file defining the monitoring sources for each OpenSearch index so they can be visualized in Business Performance Center.[ { "id": "computation-in-progress-egress", "elasticsearchIndex": "icp4ba-bai-computation-in-progress-egress", "fields": [ { "field": "status" } ] }, { "id": "computation-egress", "elasticsearchIndex": "icp4ba-bai-computation-egress", "fields": [ { "field": "status" } ] } ]
- Create a folder named
test-datacontaining the following 4 events:event-1-created.json{"id": "order_1", "kind": "order", "timestamp": "2022-04-07T00:05:42.000-04:00", "status": "ORDER_CREATED"}event-2-processing.json{"id": "order_1", "kind": "order", "timestamp": "2022-04-09T00:08:42.000-04:00", "status": "ORDER_PROCESSING"}event-3-verification.json{"id": "order_1", "kind": "order", "timestamp": "2022-04-09T00:09:42.000-04:00", "status": "ORDER_VERIFICATION"}event-4-completed.json{"id": "order_1", "kind": "order", "timestamp": "2022-04-09T00:10:42.000-04:00", "status": "ORDER_DONE"}
- Import the configuration.
management-cli processing-conf import --name="test-app" --directory="./test-app" - Deploy the application.
management-cli processing-app deploy --name="test-app" - Use the events processed by the application to check the result of the processing. Run
the following command in a dedicated terminal:
management-cli kafka consumer-json --topic=icp4ba-bai-computationThe following message appears:
Retrieving events from topic icp4ba-bai-order-computation.Events output to the egress topic is displayed in this terminal.
- Send the first three test events.
management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-1-created.json management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-2-processing.json management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-3-verification.jsonAfter sending the first events and before sending theevent-4-completed.json, the events are stored in the index defined inuncompletedOperationsIndex. You can monitor this index in Business Performance Center. - Create a dashboard named Test dashboard. In this dashboard, create a chart of type Metric named Computation in progress and select computation-in-progress-egress - ALL monitoring source. Save the chart, you should see the chart displaying a count of 1 which refers to the summary being computed.
- Create another Metric chart named Computation and select computation-egress -
ALL monitoring source. Save the chart which should also display a count of 1.
The value is 1 because the
icp4ba-bai-computation-egressindex specified in the processing configuration is mapped to an OpenSearch alias that points to indices containing completed and uncompleted summaries. - Send the last event.
management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-4-completed.jsonThe Computation in progress chart displays a metric of 0 while the Computation chart displays a metric of 1.