Defining the egresses

You can define egresses of type kafka and of type opensearch. An egress of type opensearch allows you to write the result of the processing in an OpenSearch index. You must define at least one egress of type opensearch to be able to monitor the data in Business Performance Center.

Before you begin

An egress definition includes the following properties:
  • uid: [required] A unique identifier within the processing configuration. See unique ID section in Processing configuration.
  • name: A unique name within the current processing configuration that can be referenced via an egressRef parameter.
  • type: [required] Supported types are kafka, opensearch and elasticsearch.
  • selector: [optional] JSLT expression that select the events to be processed by the current egress
  • transformer: [optional] JSLT expression used to transform the event
With type kafka
  • topic: [required] Name of the Kafka topic to produce outbound events to.
  • enabled: [optional] Indicates if this egress is enabled. (Default is false)

With type opensearch or elasticsearch

  • index: [required]
    • name: [required] Name of the index where the result of processing is written.
    • docId: [required] JSLT expression that must return, for the event being processed, the value to use as the OpenSearch document Id.
    • uncompletedOperationsIndex: [optional] Indicates which OpenSearch index to use to store uncompleted summaries coming from a context operation that references this egress using its name. Required in stateful processing when the egress is referenced by a custom operation.
      • name: [required] the name of an OpenSearch index to store active summaries.
  • mapping: [optional] A mapping definition to apply to new indices. If the index template already exists with a different mapping, the template mapping will be updated, and a rollover happens. The new active index will apply the new mapping to documents created from the egress.
    • filename: [required] Name of the JSON file that contains the OpenSearch mapping definition. File extension must be .json
  • settings: [optional] OpenSearch settings to apply to new indices.
    • index.number_of_shards: number of shards.
    • index.number_of_replicas: number of replicas.

JSLT expression can be inline or provided in a separate file, see the JSTL section in Processing configuration.

The following is an example of a kafka egress definition:
"ingresses": [
  {
    "uid": "I0001",
    "type": "kafka",
    "topic": "icp4ba-bai-order"
  }
],
"egresses": [
  {
    "uid": "E0001",
    "type": "kafka",
    "topic" "icp4ba-bai-order-series"
  }
]

An OpenSearch mapping can be optionally applied to the index to replace default OpenSearch dynamic mapping.

The following is an example of OpenSearch mapping file:
{
  "dynamic": true,
  "properties": {
    "id": {
      "type": "keyword"
    },
    "status": {
      "type": "keyword"
    },
    "timestamp": {
      "type": "date"
    }
  }
}

In order for Business Performance Center to see the events written to OpenSearch, you need to define one or several monitoring sources. These are defined in a JSON file named monitoring-sources.json. The following procedure below includes monitoring sources definitions.

About this task

This example shows how to:

  • process order events from an egress Kafka topic.
  • write the content of each event as a document in OpenSearch.
  • ensure that each document has a unique id.
  • set the document id as the concatenation of the event .id and .seq.
  • use the default mapping.
  • declare a monitoring source and monitor the result of the processing through Business Performance Center.

For an example of creating an egress of type kafka, see Defining transformers.

Procedure

  1. Create a folder named test-app containing the following two files:
    • processing-conf.json - a file defining Kafka ingress and an opensearch egress.
      {
        "ingresses": [
          {
            "uid": "I0001",
            "type": "kafka",
            "topic": "icp4ba-bai-order"
          }
        ],
        "egresses": [
          {
            "uid": "E0001",
            "type": "opensearch",
            "index": {
              "name": "icp4ba-bai-order-series",
              "docId": {
                "type": "jslt-inline",
                "expression": ".id + \"#\" + .seq"
              }
            }
          }
        ]
      }
    • monitoring-sources.json - a file defining monitoring source allowing to visualize data in Business Performance Center
      [
        {
          "id": "Orders",
          "elasticsearchIndex": "icp4ba-bai-order-series"
        }
      ]
  2. Create a folder named test-data containing order.events.txt file with following content:
    { "id": "order_1", "kind": "order", "seq": 1, "timestamp": "2021-05-07T00:00:01.000-04:00", "status": "ORDER_CREATED" }
    { "id": "order_1", "kind": "order", "seq": 2, "timestamp": "2021-05-07T00:00:04.000-04:00", "status": "ORDER_PROCESSED", "data": { "shipped": true } }
    { "id": "order_1", "kind": "order", "seq": 3, "timestamp": "2021-05-07T00:00:07.000-04:00", "status": "ORDER_DONE", "data": { "paid": true } }
    
  3. Create a Processing Application.
    management-cli processing-app create --name="test-app"
  4. Import the configuration.
    management-cli processing-conf import --name="test-app" --directory="./test-app"
  5. Deploy the application.
    management-cli processing-app deploy --name="test-app"
  6. Send the events.
    management-cli kafka producer-json --topic=icp4ba-bai-order --file=./test-data/order.events.txt --batch
  7. Visualize the result of the event processing through dashboard charts in Business Performance Center.
    1. Log in to Business Performance Center.
    2. Create a new dashboard, and name it Orders.
    3. Create a chart, type Metric and name it All orders.
    4. Select the monitoring source: Orders - ALL. The chart displays a Count of 3.
    5. Create a chart, type Data, and name it Data.
    6. In the Visualization tab, add data columns for each property. The details of the events are visible in the Data chart.
    7. Save your dashboard.

What to do next

Clean all the resources after a deployment. See Cleaning up the Processing Application.