Transforming events

Transformers allow you to perform JSON to JSON transformation on events. Transformers can be defined and executed in the ingress and in the egresses of the Processing Application. The result of a transformation is a JSON object that can have a new structure. Transformers are defined in JSLT.

About this task

The objective of this exercise is to read order events from a Kafka ingress topic, apply a transformation to them in order to generate a different JSON format, and send the new JSON objects to a Kafka egress topic. See the JSTL section at Processing configuration.

Procedure

  1. Create a folder named test-app containing the following two files:
    • processing-conf.json - a file defining Kafka ingress and an opensearch egress.
      {
        "ingresses": [
          {
            "uid": "I0001",
            "type": "kafka",
            "topic": "icp4ba-bai-order"
          }
        ],
        "egresses": [
          {
            "uid": "E0001",
            "type": "kafka",
            "topic": "icp4ba-bai-order-transformed",
            "enabled": true,
            "transformer": {
              "type": "jslt-file",
              "filename": "transform.jslt"
            }
          }
        ]
      }
    • transform.jslt - a file defining the transformation to apply.
      def status-message(status)
        if ($status == "ORDER_CREATED")
          "order created"
        else if($status == "ORDER_PROCESSED")
          "order being processed"
        else if($status == "ORDER_DONE")
          "order complete"
        else
          "unknown"
      
      {
        "order": .id,
        "status": status-message(.status) + " at " + .timestamp
      }
  2. Create a folder named test-data containing order.events.txt file with following content:
    { "id": "order_1", "kind": "order", "seq": 1, "timestamp": "2021-05-07T00:00:01.000-04:00", "status": "ORDER_CREATED" }
    { "id": "order_1", "kind": "order", "seq": 2, "timestamp": "2021-05-07T00:00:04.000-04:00", "status": "ORDER_PROCESSED", "data": { "shipped": true } }
    { "id": "order_1", "kind": "order", "seq": 3, "timestamp": "2021-05-07T00:00:07.000-04:00", "status": "ORDER_DONE", "data": { "paid": true } }
  3. Create a Processing Application.
    management-cli processing-app create --name="test-app"
  4. Import the configuration.
    management-cli processing-conf import --name="test-app" --directory="./test-app"
  5. Deploy the application.
    management-cli processing-app deploy --name="test-app"
  6. Use the events processed by the application to check the result of the processing. Run the following command in a dedicated terminal:
    management-cli kafka consumer-json --topic=icp4ba-bai-order-transformed

    The following message appears: Retrieving events from topic icp4ba-bai-order-transformed.

    Events output to the egress topic is displayed in this terminal.

  7. Send the events.
    management-cli kafka producer-json --topic=icp4ba-bai-order --file=./test-data/order.events.txt --batch

Results

You must be able to see the result from the egress topic from the terminal where you started the Kafka consumer-json command:
Retrieving events from topic icp4ba-bai-order-transformed

Event read (key=null partition=0, offset=0/1): {"order":"order_1","status":"order created at 2021-05-07T00:00:01.000-04:00"}

Event read (key=null partition=0, offset=1/2): {"order":"order_1","status":"order being processed at 2021-05-07T00:00:04.000-04:00"}

Event read (key=null partition=0, offset=2/3): {"order":"order_1","status":"order complete at 2021-05-07T00:00:07.000-04:00"}

```

What to do next

See Cleaning up the Processing Application to clean all the resources after a deployment.