Filtering events

Event processing by the Processing Application allows you to filter events using selectors. The selector configuration defines how to filter the events. It prevents the events that do not match the selector condition from being processed.

About this task

Note: The definition of a selector is optional. It can be defined on any ingress or egress instance, as an inline expression or in a dedicated JSLT file. Selectors can also be used in context operations via inSelector and outSelector parameters, as described in Custom operations.

The objective of this exercise is to read events from a Kafka ingress topic, filter out the incoming events with the type order at the ingress level, then filter out the orders with the status ORDER_DONE at the egress level and write the result to a Kafka egress topic.

Filtering will be defined as an inline expression at the ingress level and in a dedicated file at the egress level.

Procedure

  1. Create a folder named test-app containing the following two files:
    • processing-conf.json - a file defining Kafka ingress and egress.
      {
        "ingresses": [
          {
            "uid": "I0001",
            "type": "kafka",
            "topic": "icp4ba-bai-order",
            "selector": {
              "type": "jslt-inline",
              "expression": "test(.kind, \"order\")"
            }
          }
        ],
        "egresses": [
          {
            "uid": "E0001",
            "type": "kafka",
            "topic": "icp4ba-bai-order-completed",
            "enabled": true,
            "selector": {
              "type": "jslt-file",
              "filename": "filter-egress.jslt"
            }
          }
        ]
      }
      
    • filter-egress.jslt - a file defining the filter to apply at the egress level.
      test(.status, "ORDER_DONE")
  2. Create a folder named test-data containing order.events.txt file with following content:
    { "id": "order_1", "kind": "order", "seq": 1, "timestamp": "2021-05-07T00:00:01.000-04:00", "status": "ORDER_CREATED" }
    { "id": "order_1", "kind": "order", "seq": 2, "timestamp": "2021-05-07T00:00:04.000-04:00", "status": "ORDER_PROCESSED", "data": { "shipped": true } }
    { "id": "order_1", "kind": "order", "seq": 3, "timestamp": "2021-05-07T00:00:07.000-04:00", "status": "ORDER_DONE", "data": { "paid": true } }
    { "id": "shipment_1", "kind": "shipment", "seq": 1, "timestamp": "2021-05-07T00:00:01.000-04:00", "status": "ORDER_DONE" }
  3. Create a Processing Application.
    management-cli processing-app create --name="test-app"
  4. Import the configuration.
    management-cli processing-conf import --name="test-app" --directory="./test-app"
  5. Deploy the application.
    management-cli processing-app deploy --name="test-app"
  6. Run the following command in a dedicated terminal:
    management-cli kafka producer-json --topic=icp4ba-bai-order --file=./test-data/order.events.txt --batch

    The following message appears: Retrieving events from topic icp4ba-bai-order-completed.

    Events output to the egress topic is displayed in this terminal.

  7. Send the test events.
    management-cli kafka producer-json --topic=icp4ba-bai-order --file=./test-data/order.events.txt

Results

You see the result from the egress topic from the terminal where you started the Kafka consumer-json command. There is only one event logged, corresponding to the only event of type order and status ORDER_DONE:
$management-cli kafka consumer-json --topic=icp4ba-bai-order-completed
Retrieving events from topic icp4ba-bai-order-completed
Event read (key=null partition=0, offset=0/1): {"id":"order_1","kind":"order","seq":3,"timestamp":"2021-05-07T00:00:07.000-04:00","status":"ORDER_DONE","data":{"paid":true}}

What to do next

See Cleaning up the Processing Application to clean all the resources after a deployment.