You can customize event processing through custom operations by defining specific JSLT
scripts in order to perform operations on correlated events and produce an intermediary event called
a summary. Summaries can be transmitted to egresses and monitored as well.
About this task
When a new business event is received:
- the summary is retrieved from the state database by finding the summary whose field
correlationId matches with the event correlation path value.
- in the aggregation transformation, the summary is enriched with the data from the current
event.
- the enriched summary is stored in the state database.
The summary completes when the correlated input event matches completion condition specified in
the custom operation definition.
Within the custom operation, it is also possible to apply filtering using in/out
selectors. inSelector filters input events. On the other hand,
outSelector determines whether to send aggregated outputs to egresses or not.
The following table lists all the variables available in the operation transformation JSLT
script. The values are automatically set by the runtime and are accessible using syntax
$<Variable>.
Table 1. Variables available in JSLT
script
| Variable |
Description |
name |
The operation name |
businessEvent |
The current event |
summary |
The summary stored in the state |
correlationId |
The correlation identifier of an execution context |
isBusinessEventMoreRecent |
A boolean value indicating if the current event is more recent than the summary stored in the
state |
timestamp |
The timestamp of the current event |
timestampMillis |
The timestamp of the current event in milliseconds |
Procedure
-
Create a folder named test-app containing following files:
-
processing-conf.json file defining Kafka ingress, a context with custom
operation and Kafka ingress.
{
"ingresses": [
{
"uid": "I0001",
"type": "kafka",
"topic": "icp4ba-bai-orders"
}
],
"contexts": [
{
"name": "orders",
"correlationPath": {
"type": "jslt-inline",
"expression": ".id"
},
"operations": [
{
"uid": "C0001",
"type": "custom",
"name": "order-process",
"conditions": {
"completion": {
"type": "jslt-inline",
"expression": "test(.status, \"ORDER_DONE\")"
},
"aggregationCompletion": {
"type": "jslt-inline",
"expression": "test(.status, \"ORDER_DONE\") and (.startTimestamp != null)"
},
"delay": 1000
},
"aggregation": {
"type": "jslt-file",
"filename": "summary-order.jslt"
},
"egressRefs": ["opensearch-orders", "kafka-orders"]
}
]
}
],
"egresses": [
{
"uid": "E0001",
"type": "opensearch",
"name": "opensearch-orders",
"index": {
"name": "icp4ba-bai-orders",
"uncompletedOperationsIndex": {
"name": "icp4ba-bai-orders-in-progress"
},
"docId": {
"type": "jslt-inline",
"expression": ".correlationId"
}
}
},
{
"uid": "E0002",
"type": "kafka",
"name": "kafka-orders",
"topic": "icp4ba-bai-orders-kafka",
"enabled": true
}
],
"settings": {
"verboseLogs": true
}
}
-
summary-order.jslt file defining the computation applied to correlated events to
generate the summaries.
let iso8601Format = "yyyy-MM-dd'T'HH:mm:ss"
def start-time()
if (test($businessEvent.status, "ORDER_CREATED"))
{ "startTimestamp": $timestamp }
else if ($summary.startTimestamp != null)
{ "startTimestamp": $summary.startTimestamp }
else
{}
def end-time()
if (test($businessEvent.status, "ORDER_DONE"))
{ "endTimestamp": $timestamp }
else if ($summary.endTimestamp != null)
{ "endTimestamp": $summary.endTimestamp }
else
{}
def duration()
let startTime = get-key(start-time(), "startTimestamp", null)
let endTime = get-key(end-time(), "endTimestamp", null)
if ($startTime != null and $endTime != null)
{ "duration": floor(parse-time($endTime, $iso8601Format) - parse-time($startTime, $iso8601Format)) }
else
{}
def state()
if ($isBusinessEventMoreRecent)
{ "state": $businessEvent.status }
else
{ "state": $summary.state }
def timestamp()
if ($isBusinessEventMoreRecent)
{ "timestamp": $timestamp }
else
{ "timestamp": $summary.timestamp }
def previous-data()
if ($summary.data != null)
$summary.data
else
{}
def current-data()
{ for ($businessEvent.data) .key: .value }
def aggregate-data()
if ($isBusinessEventMoreRecent)
{ "data" : current-data() + previous-data() }
else
{ "data" : previous-data() + current-data() }
aggregate-data() + // data
duration() + // duration
end-time() + // endTimestamp
start-time() + //startTimestamp
state() +
timestamp() +
{
"operationName": $name,
"correlationId": $correlationId
}
- Create a folder named
test-data containing the following 3 events:
event-1-order-created.json:{
"id": "order_1",
"kind": "order",
"seq": 1,
"timestamp": "2022-04-01T00:22:12.000-04:00",
"status": "ORDER_CREATED"
}
event-2-order-processed.json:{
"id": "order_1",
"kind": "order",
"seq": 2,
"timestamp": "2022-04-03T00:13:31.000-04:00",
"status": "ORDER_PROCESSED",
"data": {
"shipped": true
}
}
event-3-order-done.json:{
"id": "order_1",
"kind": "order",
"seq": 3,
"timestamp": "2022-04-07T00:05:42.000-04:00",
"status": "ORDER_DONE",
"data": {
"paid": true
}
}
- Create a Processing Application.
management-cli processing-app create --name="test-app"
- Import the configuration.
management-cli processing-conf import --name="test-app" --directory="./test-app"
- Deploy the application.
management-cli processing-app deploy --name="test-app"
- Use the events processed by the application to check the result of the processing. Run
the following command in a dedicated terminal:
management-cli kafka consumer-json --topic=icp4ba-bai-orders-kafka
The following message appears: Retrieving events from topic
icp4ba-bai-orders-kafka.
Events output to the egress topic is displayed in this terminal.
- Send the first event.
management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-1-order-created.json
When the first event
event-1-order-created.json is processed, the
startTimestamp field is created, and assigned the value of this event
timestamp:
Event read (key=null partition=0, offset=138/139): {"operationName":"order-process","correlationId":"order_1","timestamp":"2022-04-01T00:22:12.000-04:00","state":"ORDER_CREATED","startTimestamp":"2022-04-01T00:22:12.000-04:00","data":{}}`
- Send the second event.
management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-2-order-processed.json
When the second event is processed, the data it contains is added to the summary, and
the event state present in the summary is
updated:
Event read (key=null partition=0, offset=139/140): {"operationName":"order-process","correlationId":"order_1","timestamp":"2022-04-03T00:13:31.000-04:00","state":"ORDER_PROCESSED","startTimestamp":"2022-04-01T00:22:12.000-04:00","data":{"shipped":true}}
- Send the last event.
management-cli kafka producer-json --topic=icp4ba-bai-orders --file=test-data/event-3-order-done.json
The last event data is added into the summary, the endTimestamp field is
assigned the value of it's timestamp and the duration is computed as the difference between the
startTimestamp and the endTimestamp:
Event read (key=null partition=0, offset=140/141): {"operationName":"order-process","correlationId":"order_1","timestamp":"2022-04-07T00:05:42.000-04:00","state":"ORDER_DONE","startTimestamp":"2022-04-01T00:22:12.000-04:00","endTimestamp":"2022-04-07T00:05:42.000-04:00","duration":517410,"data":{"shipped":true,"paid":true}}
What to do next
See Cleaning up the Processing
Application to clean all the resources after a deployment.