How to Create an Apache Kafka Consumer for Manta Events That Will Send a Notification Email Using Native API

This page uses concepts discussed in Manta Event System and Connector: Apache Kafka.

If possible, it is recommended to use the Spring Boot API, which provides additional functionalities and is easier to set up. The guide for setting up a Spring Boot project can be found here.

Prerequisites

This guide assumes that the following infrastructure is present before this guide is followed.

Setting Up Manta Event System

  1. Navigate to the configuration page dedicated to the Apache Kafka connector: Connections Add connectionMANTA Event SystemKafka Events Plugin.

  2. Enter the following configurations.

    1. Bootstrap Servers - localhost:9092

    2. Topic Name - manta-events

  3. Click the Save button.

  4. Restart the Manta Service Utility instance to make the changes valid (the connector is initialized while Manta Service Utility is starting.)

Implementing the Event System Consumer in Java

Use the following libraries to implement the Event System Consumer in Java:

a. Implementing the Model

Manta Events are always delivered in JSON payloads in the format defined in Manta Event System | Event-Message-Format. In this instance, we will be using the properties name and message. For more advanced filtration, it is possible to use the static fields or the dynamic public parameters which are defined for each event individually, see Events.

Therefore, our model class will be implemented as follows.

public class MantaEvent {

    // FIELDS AS DEFINED IN https://manta-io.atlassian.net/wiki/spaces/MTKB/pages/3155951635/MANTA+Event+System#Event-message-format
    private final String name;
    private final String message;
    private final Date timestamp;
    private final String type;
    private final String priority;

    /**
     * Create MANTA Event object. JSON creator constructor used by Jackson mapper for mapping from String JSON payload.
     * @param name name of the event
     * @param message message of the event
     * @param timestamp timestamp of the event generation
     * @param type type of the event (see docs for possible values)
     * @param priority priority of the vent (see docs for possible values)
     */
    @JsonCreator
    public MantaEvent(@JsonProperty("name") String name,
                      @JsonProperty("message") String message,
                      @JsonProperty("timestamp") Date timestamp,
                      @JsonProperty("type") String type,
                      @JsonProperty("priority") String priority) {
        this.name = name;
        this.message = message;
        this.timestamp = timestamp;
        this.type = type;
        this.priority = priority;
    }

    // getters...
}

Note the use of the @JsonCreator and @JsonProperty annotations from jackson library, which is a recommended way of processing JSON payloads in Java. These are used to map the JSON fields to the class properties.

b. Implementing the Component for Sending an Email

For handling email communication, we will be using the Apache Commons Email library. This library greatly simplifies operations with native JavaMail API, which can still be used if it is not possible to use this one.

This component will consist of a single public method EmailSender.sendEventEmail(MantaEvent event), which will handle sending the email to the defined recipient.

To send an email through a custom application, we need the following information.

In the provided project, these fields are empty with a TODO message for them to be filled with the user’s information as described in this guide.

Once these fields are completed, we will implement the method for sending the actual email.

public class EmailSender {

    // ... EMAIL fields

    public void sendEventEmail(MantaEvent event) {
        try {
            HtmlEmail email = new HtmlEmail();
            email.setHostName(MAIL_HOST);
            email.setSmtpPort(SMTP_PORT);
            email.setAuthenticator(new DefaultAuthenticator(FROM_ADDRESS, FROM_ADDRESS_PASSWORD));
            email.setSSL(true);
            email.setFrom(FROM_ADDRESS);
            email.addTo(TO_ADDRESS);
            email.setSubject(EMAIL_SUBJECT);
            email.setHtmlMsg(event.getMessage()); // Set the message body equal to the event message
            email.send();
        } catch (EmailException e) {
            System.out.println("Failed to send the email.");
            e.printStackTrace();
        }
    }

}

In the preceding snippet, we built the email object with our parameters, set the predefined subject, and set the message body of the email equal to the event message.

c. Implementing the Apache Kafka Consumer

To implement the Apache Kafka consumer, we will be using the native Apache Kafka Clients library. Using this library, it is only possible to block consumers. (If the consumer should run always, it should run in an infinite loop.) For an asynchronous message consumption, use the Spring Kafka library in a Spring Boot application.

To implement the consumer, we need the following information.

First, we are going to define the native consumer. We will implement this using a static factory method.

public class MantaEventsKafkaConsumer {

   private static final String EVENT_SYSTEM_TOPIC = "manta-events";
   private static final String BOOTSTRAP_SERVERS = "localhost:9092";

   public static MantaEventsKafkaConsumer createKafkaConsumer(EmailSender emailSender) {
        // Configure the consumer
        Properties consumerProperties = new Properties();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "MANTA-EVENT-SYSTEM-CONSUMERS");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // Create the consumer
        Consumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);

        // Subscribe to the Event System topic
        consumer.subscribe(Collections.singletonList(EVENT_SYSTEM_TOPIC));

        return new MantaEventsKafkaConsumer(consumer, emailSender);
    }

    // ...

}

In the preceding snippet, we have first configured the consumer.

  1. Configure the bootstrap servers.

  2. Set the group ID of the consumer group that the consumer is running in.

  3. Set the key deserializer to StringDeserializer since the key in all Manta Events is a String.

  4. Set the value deserializer to StringDeserializer since the value in all Manta Events is a JSON in a String.

Then, we created the native Kafka consumer and subscribed to the manta-events topic.

Lastly, we will implement the method that the consumer will use to consume and handle the incoming messages. As stated before, in order for a consumer to run at all times, it must run in an infinite loop of sorts. We will be using the poll) method of the Kafka consumer with the timeout of 1 second. If no messages are received in this time, we will poll again. If messages are received, the consumer will loop through them, map each message value (JSON payload of the Manta Event) to an object and pass it to the event sender component.

public class MantaEventsKafkaConsumer {
   // ...

   public void run() {
        while (true) {
            // Poll for a second
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(SECONDS.getDuration());

            // If no records received, continue
            if (consumerRecords.count() == 0) {
                continue;
            }

            // For each received event, map it to an object and send the email
            consumerRecords.forEach(event -> {
                MantaEvent mantaEvent = null;
                try {
                    mantaEvent = objectMapper.readValue(event.value(), MantaEvent.class);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                    return;
                }

                emailSender.sendEventEmail(mantaEvent);
            });
        }
    }
}

Using this method, we can implement more advanced filtration. For example, we can filter:

if (mantaEvent.getName().equals("WORKFLOW_EXECUTION_FINISHED_EVENT")) {
    emailSender.sendEventEmail(mantaEvent);
}
if (mantaEvent.getPriority().equals("HIGH")) {
    emailSender.sendEventEmail(mantaEvent);
}
if (mantaEvent.getType().equals("ISSUE")) {
    emailSender.sendEventEmail(mantaEvent);
}

Note that the list of preceding examples is not exhaustive. For further filtration options, consult the documentation at Manta Event System | Event-Message-Format and Events.

With this setup, it is now possible to run the program and receive emails according to the implemented filtration.

d. Full Project

The attached compressed file contains a Maven project that implements all the concepts discussed in this guide. There are TODO messages for the fields that can only be filled out in the context of the local environment that the demo is run in.

native-kafka-example.zip

Implementing the Event System Consumer in Python

Dependencies: Confluent-kafka
To be installed by: python -m pip install confluent-kafka

a. config.py

config = {
    "broker": "localhost:9092",
    "topic": "manta-events",
    "group_id": "Mailer",
    "event": "WORKFLOW_EXECUTION_FINISHED_EVENT",
    "offset": "largest",
    "commit": "false",
    "poll": "86400000",
    "port": 25,
    "smtp_server": "",
    "sender": "",
    "password": "",
    "receivers": ["a@a.com","b@a.com"],
    "subject": "Manta Workflow Execution Status"
}

b. consumer.sh

#!/usr/bin/env python3

from confluent_kafka import Consumer
import json
import ssl
import smtplib
import datetime
from email.mime.text import MIMEText
from config import config


class MantaConsumer:
    def ts2str(self, ts):
        return datetime.datetime.fromtimestamp(ts / 1e3).strftime('%Y-%m-%d %H:%M:%S')

    def sendmail(self, body):
        message = MIMEText(body)
        message['From'] = config['sender']
        message['To'] = config['receivers']
        message['Subject'] = config['subject']
        with smtplib.SMTP(config['smtp_server'], config['port']) as server:
            server.login(config['sender'], config['password'])
            server.sendmail(config['sender'], config['receivers'], message)

    def start_listener(self):
        consumer_config = {
            'bootstrap.servers': config['broker'],
            'group.id': config['group_id'],
            'auto.offset.reset': config['offset'],
            'enable.auto.commit': config['commit'],
            'max.poll.interval.ms': config['poll']
        }
        consumer = Consumer(consumer_config)
        consumer.subscribe([config['topic']])
        try:
            while True:
                msg = consumer.poll(timeout=10.0)
                if msg is None:
                    continue
                if msg.error():
                    print('Error reading message : {}'.format(msg.error()))
                    continue
                message = format(msg.value().decode('utf-8'))
                m = json.loads(message)
                if m['name'] == config['event']:
                    m['timestamp'] = self.ts2str(m['timestamp'])
                    m['publicParameters']['executionEnd'] = self.ts2str(
                        m['publicParameters']['executionEnd'])
                    m['publicParameters']['addedToWaitingQueue'] = self.ts2str(
                        m['publicParameters']['addedToWaitingQueue'])
                    m['publicParameters']['executionStart'] = self.ts2str(
                        m['publicParameters']['executionStart'])
                    #print(json.dumps(m, indent=4, sort_keys=False))
                    self.sendmail(json.dumps(m, indent=4, sort_keys=False))
                consumer.commit()
        except Exception as ex:
            print("Kafka exception : {}", ex)
        finally:
            print("Closing consumer")
            consumer.close()


mc = MantaConsumer()
mc.start_listener()

c. SystemD unit file

[Unit]
Description=Manta Event Mail Service
Requires=kafka.service
After=kafka.service

[Service]
Type=simple
ExecStart=</path-to-consumer.sh>
Restart=on-abnormal

[Install]
WantedBy=multi-user.target