Processing configuration

The processing configuration is made of a set of json and JSLT files.

The processing-conf.json is the main configuration file and is mandatory to have. In this file, you define the sources and the destinations of events and how they are processed. It can refer to other files such as JSLT transformation definitions, and OpenSearch mapping definitions.

The monitoring-sources.json provides a monitoring source definition for Business Performance Center to query the relevant data stored in OpenSearch. It has details on locating data to build monitoring dashboards and charts.

All the event data manipulation (selection, transformation) is done through JSLT External link opens a new window or tab scripting. A JSLT expression can be declared directly in processing-conf.json as a property value (inline) or in a dedicated file. In the latter case, a processing-conf.json property refers to the file name (which must have the .jslt extension)

Example processing-conf.json

The following processing-conf.json defines one Kafka ingress, one Kafka egress and one OpenSearch egress:

{
  "ingresses": [
    {
      "uid": "I0001",
      "type": "kafka",
      "topic": "icp4ba-order-ingress"
    }
  ],
  "egresses": [
    {
      "uid": "E0001",
      "type": "kafka",
      "topic": "icp4ba-order-egress"
    },
    {
      "uid": "E0002",
      "type": "opensearch",
      "index": {
        "name": "icp4ba-orders",
        "docId": {
          "type": "jslt-inline",
          "expression": ".id"
        },
        "mapping": {
          "filename": "orders-mapping.json"
        }
      },
      "selector": {
        "type": "jslt-file",
        "filename": "event-selector.jslt"
      },
      "transformer": {
        "type": "jslt-file",
        "filename": "event-transformer.jslt"
      }
    }
  ],
  "settings": {
    "verboseLogs": true
  }
}

Expected configuration files for this example are:

<configuration directory>
   |-- processing-conf.json
   |-- orders-mapping.json => custom OpenSearch mapping
   |-- event-selector.jslt => custom jslt, referenced by processing-conf.json
   +-- event-transformer.jslt => another custom jslt, referenced by processing-conf.json

unique ID (uid)

The ingress and egress instances require a unique ID (uid) within the processing configuration. This ID allows identifying underlying processing Job operators.

The contexts require a unique name and each operation inside a context need a unique ID (uid).

The length of the uid must be 5. However, a length of up to 36 is authorized for compatibility reasons but is deprecated. While the value of a uid is free, by convention all ingress uid start with I, all egress start with E and a number to identify the instance.

Warning: You must not change the uid after you set it. This identifier is used to upgrade the processing when you redeploy your application with the configuration changes. It is assigned to a Flink operator to manage the Flink job recovery through checkpoints and savepoints. As a consequence after a uid is set in the processing configuration, it must not be changed. Otherwise, the Flink job fails to recover from the checkpoints and savepoints previously generated as they do not match.

ingress

The processing configuration must define only one ingress instance.

  • uid: [required] A unique identifier within the processing configuration. See Unique ID.
  • type: [required] The only supported value is kafka
  • topic: [required] Name of the Kafka topic to consume inbound events from.
  • timestampPath: [optional] JSLT expression that indicates the path to a field containing the timestamp in an inbound event (by default a 'timestamp' field is expected at the root level).
  • selector: [optional] JSLT expression that selects the events to be processed by the current ingress. See Filtering events.
  • transformer: [optional] JSLT expression used to transform the event See Transforming events
  • egressRefs: [optional] an array of egress names. If specified this property must contain at least one egress name. Allows you to sink events directly to specified egresses when the processing application uses stateful processing.

egress

The processing configuration must define at least one egress instance.

  • uid: [required] A unique identifier within the processing configuration. See Unique ID.
  • name: [optional] A unique name within the current processing configuration that can be referenced via an egressRef parameter.
  • type: [required] Supported types are kafka, opensearch and elasticsearch.
    Note: The opensearch and elasticsearch types are identical. Support for the elasticsearch type is maintained solely for backward compatibility with existing processing. In practice, both opensearch and elasticsearch type egresses function as opensearch egresses.
  • selector: [optional] JSLT expression that select the events to be processed by the current egress.
  • transformer: [optional] JSLT expression used to transform the event.
With type: kafka
  • topic: [required] Name of the Kafka topic to produce outbound events to.
  • enabled: [optional] Indicates if this egress is enabled. (Default is false)
  • kind: [optional] Identifies this egress as a dead letter queue when the value is deadLetterQueue. Only one egress of kind deadLetterQueue is allowed. Possible values egress(default) and deadLetterQueue

With type: opensearch or elasticsearch

  • index: [required]
    • name: [required] Name of the index where the result of processing is written.
    • docID: [required] JSLT expression that must return, for the event being processed, the value to use as the OpenSearch document Id.
    • uncompletedOperationsIndex: [optional] Indicates which OpenSearch index to use to store uncompleted summaries coming from a context operation that references this egress using its name. Required in stateful processing when the egress is referenced by a custom operation.
      • name: [required] Name of an OpenSearch index to store active summaries.
  • mapping: [optional] A mapping definition to apply to new indices. If the index template already exists with a different mapping, the template mapping will be updated, and a rollover happens. The new active index will apply the new mapping to documents created from the egress.
    • filename: [required] Name of the JSON file that contains the OpenSearch mapping definition. File extension must be .json.
  • settings: [optional] OpenSearch settings to apply to new indexes.
    • index.number_of_shards: Number of shards.
    • index.number_of_replicas: Number of replicas

context (optional)

  • name: [required] the unique name of the correlation context.
  • correlationPath: [required] JSLT expression that indicates the path to a field holding the correlation identifier in an inbound event. This path can also be a composition of multiple field paths in that event. It can be a string or a number.
  • operations: [required] the operations to perform on business events - must contain one or multiple supported operations

duration operation

  • uid: [required] A unique identifier within the current processing configuration.
  • type: [required] duration
  • name: [required] a unique name in the processing configuration file corresponding to the field containing the duration.
  • conditions: [required] the conditions to compute the duration.
    • started: [required] JSLT expression defining a predicate that returns true to identify an event with the start timestamp.
    • completed: [required] JSLT expression defining a predicate that returns true to identify an event with the end timestamp.
    • delay: [optional] the delay in millisecond that triggers Flink job state after the completion condition becomes true (default is 10000).

custom operation

  • uid: [required] A unique identifier within the current processing configuration.
  • type: [required] custom
  • conditions: [required] the conditions to compute the aggregation of business events.
    • completion: [required] JSLT expression defining a predicate that returns true to identify a completion event to complete the aggregation.
    • aggregationCompletion: [required] JSLT expression defining a predicate that returns true to identify an aggregation completion. Any opensearch egress referenced by egressRefs must also define the configuration property uncompletedOperationsIndex.
    • delay: [optional] the delay in millisecond that for the operation to be considered as completed after the completion condition becomes true (default is 10000). Within this delay, out of order events received in this context keep being processed and aggregated in the summary.
  • aggregation: [required] JSLT expression providing the information to aggregate business events an produce a summary.
  • inSelector: [optional] JSLT expression defining a predicate to indicate if the events must be processed by the operation (default is to accept all business events)
  • outSelector: [optional] JSLT expression defining a predicate to indicate if the summary must be produced (default is to produce a summary whenever a business event is aggregated).
  • egressRefs: [required] an array of at least one egress name parameter.

settings

  • verboseLogs: [optional] A flag indicating whether additional information is logged at INFO log level. (default is false).
  • deserializationErrorMode: [optional] is an enumeration of strings designed to control the behavior of deserialization error handling in your application. Possible values [LOG_ERROR_AND_CANCEL_JOB, LOG_ERROR_AND_SKIP_EVENT, LOG_ERROR_AND_USE_DLQ] (default: "LOG_ERROR_AND_CANCEL_JOB")
    Warning: When using LOG_ERROR_AND_USE_DLQ as deserialization error mode, you need to have one Kafka egress of kind deadLetterQueue in your processing configuration.
  • opensearch: [optional]
    • bulk.flush.max.actions: [optional] maximum amount of actions to buffer before flushing (default is 1).
    • bulk.flush.interval.ms: [optional] interval at which to flush regardless of the amount of buffered actions (default is -1).
Tip: By tuning both the bulk.flush.max.actions and bulk.flush.interval.ms parameters, you can improve the OpenSearch indexing performance. The following values typically provide a good throughput with the default OpenSearch settings.
"bulk.flush.max.actions": 8
"bulk.flush.interval.ms": 2000

JSLT

JSLT can be defined as an inline expression or in a dedicated file:

  • type: indicates whether the jslt expression in a file (jslt-file) or as a string value (jslt-inline)
  • filename: name of the file that contains the jslt expression (required if type: jslt-file). File extension must be .jslt
  • expression: inline jslt expression (required if type: jslt-inline)

Monitoring sources

Monitoring sources allow to visualize the result of the processing written in OpenSearch through Business Performance Center charts. Monitoring sources are defined in an optional JSON file named monitoring-sources.json. This file must be located next to the processing-conf.json file in the processing-app folder before importing the configuration. Several monitoring sources can be included in the processing-conf.json file.

Monitoring sources are defined with following attributes:

  • id: A unique identifier/name to identify the monitoring source when you create charts.
  • elasticsearchIndex: Refers to the OpenSearch index configuration name defined in the processing-conf.json
    • fields: [optional] Specify fields to identify monitoring sources. If more than one field is required to identify monitoring sources, you can provide several pairs of field and label fields.
      • field: field used to retrieve and aggregate monitoring source data

      • labelField: field to serve as the label for the monitoring sources. If no label field is provided, the value of the field that is specified to retrieve and aggregate data is used as the label.
For example, consider the following definition:
"fields": [
        {
            "field": "item_id.keyword",
            "labelField": "item_name"
        }
      ]
  • item_id: This field consists in a keyword type in OpenSearch which represents the identifier of an item. To specify the associated field with a keyword type in OpenSearch, you must append the .keyword suffix to the field name.
  • item_name: This field consists in a text type in OpenSearch that represents the name of an item.
One monitoring source exists for each distinct value of the item_id field. The corresponding value of the item_name field is used as a label for selecting the monitoring sources in Business Performance Center. If more than one versions exist of a given item with a given identifier, you can use an item_version field to specify one monitoring source for each version of item.
"fields": [
        {
            "field": "item_id.keyword",
            "labelField": "item_name"
        },
        {
            "field": "item_version"
        }
      ]
Note: In the monitoring_source_fields section, field elements must have the type keyword in OpenSearch whereas no type constraint exists on the fields that are used as label_fields.

Example of monitoring-sources.json file:

[
  {
    "id": "Orders",
    "elasticsearchIndex": "icp4ba-bai-orders",
    "fields": [
        {
            "field": "status",
            "labelField": "statusLabel"
        }
      ]
  },
  {
    "id": "Shipments",
    "elasticsearchIndex": "icp4ba-bai-shipments"
  }
]