Defining the egresses
You can define egresses of type kafka and of type opensearch. An egress of type
opensearch allows you to write the result of the processing in an OpenSearch index. You must
define at least one egress of type opensearch to be able to monitor the data in Business Performance Center.
Before you begin
uid: [required] A unique identifier within the processing configuration. See unique ID section in Processing configuration.name: A unique name within the current processing configuration that can be referenced via an egressRef parameter.type: [required] Supported types are kafka, opensearch and elasticsearch.selector: [optional] JSLT expression that select the events to be processed by the current egresstransformer: [optional] JSLT expression used to transform the event
topic: [required] Name of the Kafka topic to produce outbound events to.enabled: [optional] Indicates if this egress is enabled. (Default is false)
With type opensearch or elasticsearch
index: [required]name: [required] Name of the index where the result of processing is written.docId: [required] JSLT expression that must return, for the event being processed, the value to use as the OpenSearch document Id.uncompletedOperationsIndex: [optional] Indicates which OpenSearch index to use to store uncompleted summaries coming from a context operation that references this egress using its name. Required in stateful processing when the egress is referenced by a custom operation.name: [required] the name of an OpenSearch index to store active summaries.
mapping: [optional] A mapping definition to apply to new indices. If the index template already exists with a different mapping, the template mapping will be updated, and a rollover happens. The new active index will apply the new mapping to documents created from the egress.filename: [required] Name of the JSON file that contains the OpenSearch mapping definition. File extension must be .json
settings: [optional] OpenSearch settings to apply to new indices.index.number_of_shards: number of shards.index.number_of_replicas: number of replicas.
JSLT expression can be inline or provided in a separate file, see the JSTL section in Processing configuration.
The
following is an example of a kafka egress definition:
"ingresses": [
{
"uid": "I0001",
"type": "kafka",
"topic": "icp4ba-bai-order"
}
],
"egresses": [
{
"uid": "E0001",
"type": "kafka",
"topic" "icp4ba-bai-order-series"
}
]An OpenSearch mapping can be optionally applied to the index to replace default OpenSearch dynamic mapping.
The following is an example of OpenSearch mapping
file:
{
"dynamic": true,
"properties": {
"id": {
"type": "keyword"
},
"status": {
"type": "keyword"
},
"timestamp": {
"type": "date"
}
}
}In order for Business Performance Center to see the events
written to OpenSearch, you need to define one or several monitoring sources. These are defined in
a JSON file named monitoring-sources.json. The following procedure below includes
monitoring sources definitions.
About this task
This example shows how to:
- process order events from an egress Kafka topic.
- write the content of each event as a document in OpenSearch.
- ensure that each document has a unique id.
- set the document id as the concatenation of the event
.idand.seq. - use the default mapping.
- declare a monitoring source and monitor the result of the processing through Business Performance Center.
For an example of creating an egress of type kafka, see Defining transformers.
Procedure
What to do next
Clean all the resources after a deployment. See Cleaning up the Processing Application.