Transformers allow you to perform JSON to JSON transformation on events. Transformers can
be defined and executed in the ingress and in the egresses of the Processing Application. The result
of a transformation is a JSON object that can have a new structure. Transformers are defined in
JSLT.
About this task
The objective of this exercise is to read order events from a Kafka ingress topic, apply a
transformation to them in order to generate a different JSON format, and send the new JSON objects
to a Kafka egress topic. See the JSTL section at Processing configuration.
Procedure
- Create a folder named
test-app containing the following two files:
- processing-conf.json - a file defining Kafka ingress and an opensearch
egress.
{
"ingresses": [
{
"uid": "I0001",
"type": "kafka",
"topic": "icp4ba-bai-order"
}
],
"egresses": [
{
"uid": "E0001",
"type": "kafka",
"topic": "icp4ba-bai-order-transformed",
"enabled": true,
"transformer": {
"type": "jslt-file",
"filename": "transform.jslt"
}
}
]
}
transform.jslt - a file defining the transformation to
apply.def status-message(status)
if ($status == "ORDER_CREATED")
"order created"
else if($status == "ORDER_PROCESSED")
"order being processed"
else if($status == "ORDER_DONE")
"order complete"
else
"unknown"
{
"order": .id,
"status": status-message(.status) + " at " + .timestamp
}
- Create a folder named test-data containing order.events.txt file
with following content:
{ "id": "order_1", "kind": "order", "seq": 1, "timestamp": "2021-05-07T00:00:01.000-04:00", "status": "ORDER_CREATED" }
{ "id": "order_1", "kind": "order", "seq": 2, "timestamp": "2021-05-07T00:00:04.000-04:00", "status": "ORDER_PROCESSED", "data": { "shipped": true } }
{ "id": "order_1", "kind": "order", "seq": 3, "timestamp": "2021-05-07T00:00:07.000-04:00", "status": "ORDER_DONE", "data": { "paid": true } }
-
Create a Processing Application.
management-cli processing-app create --name="test-app"
- Import the configuration.
management-cli processing-conf import --name="test-app" --directory="./test-app"
- Deploy the application.
management-cli processing-app deploy --name="test-app"
- Use the events processed by the application to check the result of the processing. Run
the following command in a dedicated terminal:
management-cli kafka consumer-json --topic=icp4ba-bai-order-transformed
The following message appears: Retrieving events from topic
icp4ba-bai-order-transformed.
Events output to the egress topic is displayed in this terminal.
- Send the events.
management-cli kafka producer-json --topic=icp4ba-bai-order --file=./test-data/order.events.txt --batch
Results
You must be able to see the result from the egress topic from the terminal where you started
the Kafka consumer-json command:
Retrieving events from topic icp4ba-bai-order-transformed
Event read (key=null partition=0, offset=0/1): {"order":"order_1","status":"order created at 2021-05-07T00:00:01.000-04:00"}
Event read (key=null partition=0, offset=1/2): {"order":"order_1","status":"order being processed at 2021-05-07T00:00:04.000-04:00"}
Event read (key=null partition=0, offset=2/3): {"order":"order_1","status":"order complete at 2021-05-07T00:00:07.000-04:00"}
```
What to do next
See Cleaning up the Processing
Application to clean all the resources after a deployment.