Message processing using a Kafka provider

With a Kafka messaging provider, you can process outbound messages sequentially and inbound messages sequentially or continuously. In general, message processing is similar to JMS message processing.

Outbound sequential message processing

The integration framework uses Kafka topics as outbound, sequential message queues. When a message is sent from the integration framework, it is routed to your Kafka provider, where it is hosted as a JSON message in a predefined topic. Although publish channels can send messages in XML or JSON format, the product wraps them as JSON formatted messages and writes them to the Kafka topic. An outbound message uses the structure that is shown in the following code example:
{
      “interfacetype”:”MAXIMO”,
      “INTERFACE”:”MXPRInterface”,
      “payload”:”<base64 encoded compressed xml/json message>”,
      “SENDER”:”MX”,
      “destination”:”<external system name>”,
      “destjndiname”:”<kafka topic name>”,
      “compressed”:”1”,
      “MEAMessageID”:”<providername>~<topic>~<partition>~<offset>”,
      “mimetype”:”application/xml”
}
The payload property contains the payload that is sent to the endpoint. This value is a compressed and Base64-encoded version of the original message. Kafka messages that are generated by the product are compressed at storage inside the Kafka partitions. In the Kafka partitions, messages are stored in a strict timestamp order. The position of the message in the partition is given a sequential ID number that is known as a message offset. Although Kafka topics can be partitioned, sequentially processed topics must be in a single partition.

After the message is written to the Kafka topic, it is processed by the associated Kafka cron task that you configure in the Cron Task Setup application for each topic. Processing occurs according to the external system and endpoint that are configured for the topic.

Inbound sequential processing

Inbound message processing can be sequential or continuous. Inbound sequential processing is similar to outbound sequential processing and requires the same types of configuration, including an instance of the Kafka cron task.

The structure of a sequential inbound message is shown in the following example:
{
     “interfacetype”: "MAXIMO",
     “SENDER”: "testkafka",
     “destination”: "testkafka",
     “USER”: "wilson",
     “MEAMessageID”: "….",
     “INTERFACE”: "MXASSETInterface",
     “payload”: "{"assetnum":"AZ163","siteid":"BEDFORD"}",
     “mimetype”: "application/json",
    “destjndiname”: "anamitratestcont"
} 

Error processing for sequential queues

With sequential queues, at the first message that fails after the retry count that is specified for the queue, the Kafka cron task stops and does not move to the next topic until the failed topic either is corrected or marked for deletion in the Message Reprocessing application, or the message expires in Kafka.

Expired messages can be corrected and retried manually, but they are not removed from the error table automatically. You must configure the KafkaErrorsCleanup cron task to remove expired messages after the specified retention time if they are marked for deletion. Set the product's retention time to match the retention time that is set in Kafka.

Inbound continuous processing

Inbound continuous message processing with Kafka is similar to JMS message processing. Differences are found in error processing.

Error processing for continuous inbound queues

For continuous inbound processing, you can set up an error queue in Kafka, register it in the product as a sequential inbound queue, and specify it as the error queue for continuous inbound processing. Specify a Kafka cron task instance to process messages from the error queue.

When you configure the error queue in the External Systems application, you specify a message retry count and a retention time. The retry count value determines how many times the Kafka cron task for the continuous queue tries to reprocess the message before the message is moved to the error queue. The Kafka continuous consumer rewrites the message to the forward location with each error retry and continuously scrolls forward through the offsets. If new messages are written to the queue before the rewrite occurs, then the new messages are processed before the rewritten message. Because of this processing, errors can be resolved if a new message provides an object that is needed by the previously failed message.

Messages that fail processing are moved to the configured error queue in a Hold status. If you configure error queues for continuous queues in Kafka as sequential queues, the first failed message in the error queue stops the processing of the error queue until the failed message is fixed and retried or the message expires in Kafka.

If you do not specify an error queue, the framework tries to process a failed message according to the value that is specified for the retry count on the queue. If the message still fails, processing moves on to the next message, and an administrator needs to process the message in the Message Reprocessing application.

Expired messages can be corrected and retried manually, but they are not removed from the error table automatically. You must configure the KafkaErrorsCleanup cron task to remove expired messages after the specified retention time if they are marked for deletion. Set the product's retention time to match the retention time that is set in Kafka.

Configuration of a redelivery delay for continuous queues

An administrator can configure a redelivery delay that works when a retry count is specified for a continuous queue. When a redelivery delay is specified in the mxe.int.kafkaredeldelay property, the cron task for the queue does not rewrite a message to the front of the Kafka queue until after the delay time. The delay time is specified in seconds.

For example, if the delay is set to 30 seconds, the cron task for the continuous queue pauses for 30 seconds before it rewrites the message with errors to the front of the queue. The delay allows other messages to be processed in the queue, some of which can contain objects that are needed to successfully process the message that had errors. The drawback of using this mechanism is that the delay slows down the processor because it delays the cron task from continued processing.