How to Create an Apache Kafka Consumer for Manta Events That Will Send a Notification Email Using Spring Boot

In this guide, we will create a Spring Boot application running an Apache Kafka consumer from a Manta Event System topic that processes the incoming events and sends email notifications when certain conditions are met. We will go through all the steps required to set up Manta Event System and implement the consumer. A project implementing all the steps discussed in this guide is included at the bottom of the page.

This page uses concepts discussed in Manta Event System and Connector: Apache Kafka.

Prerequisites

This guide assumes that the following infrastructure is present before this guide is followed.

1. Setting Up Manta Event System

First, we need to set up the connector to Apache Kafka. To do so, follow these steps.

  1. Navigate to the configuration page dedicated to the Apache Kafka connector: Connections (1) → Add Connection (2) → MANTA Event SystemKafka Events Plugin (3).

  2. No alt text provided

    Enter the following configurations.

    1. Apache Kafka Connector Enabled is set to true

    2. Bootstrap Servers contains the record localhost:9092

    3. The Topic Name is manta-events

  3. Click on the Save button in the upper-right corner.

  4. Restart the Manta Service Utility instance to make the changes valid. (The connector is initialized while Manta Service Utility is starting.)

No alt text provided

The required configuration in this guide

2. Implementing the Event System Consumer

In this section, we will go through the implementation of all required components and explain the steps required to implement them. We will use the following libraries to achieve this task.

a. Implementing the Model

Manta Events are always delivered in JSON payloads in the format

defined in Manta Event System. In this instance, we will be using the properties name and message. For more advanced filtration, it is possible to use the static fields or dynamic public parameters that are defined for each event individually; see Events.

Our model class will therefore be implemented as follows.

public class MantaEvent {

    // FIELDS AS DEFINED IN https://mantatools.atlassian.net/wiki/spaces/MTKB/pages/3155951635/MANTA+Event+System#Event-message-format
    private final String name;
    private final String message;
    private final Date timestamp;
    private final String type;
    private final String priority;

    /**
     * Create MANTA Event object. JSON creator constructor used by Jackson mapper for mapping from String JSON payload.
     * @param name name of the event
     * @param message message of the event
     * @param timestamp timestamp of the event generation
     * @param type type of the event (see docs for possible values)
     * @param priority priority of the vent (see docs for possible values)
     */
    @JsonCreator
    public MantaEvent(@JsonProperty("name") String name,
                      @JsonProperty("message") String message,
                      @JsonProperty("timestamp") Date timestamp,
                      @JsonProperty("type") String type,
                      @JsonProperty("priority") String priority) {
        this.name = name;
        this.message = message;
        this.timestamp = timestamp;
        this.type = type;
        this.priority = priority;
    }

    // getters...
}

Note the use of the @JsonCreator and @JsonProperty annotations from jackson library, which is a recommended way of processing JSON payloads in Java. These are used to map the JSON fields to the class properties.

b. Implementing the Component for Sending an Email

For handling email communication, we will be using the Apache Commons Email library. This library greatly simplifies operations with native JavaMail API, which can still be used if it is not possible to use this one.

This component will consist of a single public method, EmailSender.sendEventEmail(MantaEvent event), which will handle sending the email to the defined recipient.

To send an email through a custom application, we will need the following information.

In the provided project, these fields are empty with a TODO message for them to be filled with the user’s information as described in this guide.

Once these fields are completed, we will implement the method for sending the actual email.

@Component
public class EmailSender {

    // ... EMAIL fields

    public void sendEventEmail(MantaEvent event) {
        try {
            HtmlEmail email = new HtmlEmail();
            email.setHostName(MAIL_HOST);
            email.setSmtpPort(SMTP_PORT);
            email.setAuthenticator(new DefaultAuthenticator(FROM_ADDRESS, FROM_ADDRESS_PASSWORD));
            email.setSSL(true);
            email.setFrom(FROM_ADDRESS);
            email.addTo(TO_ADDRESS);
            email.setSubject(EMAIL_SUBJECT);
            email.setHtmlMsg(event.getMessage()); // Set the message body equal to the event message
            email.send();
        } catch (EmailException e) {
            System.out.println("Failed to send the email.");
            e.printStackTrace();
        }
    }

}

In the snippet above, we have built the email object with our parameters, set the predefined subject, and set the message body of the email equal to the event message.

Note that the class is expected to be a Spring component. It is annotated with @Component annotation.

c. Implementing the Consumer

Lastly, we will implement the actual consumer class. This class also needs to be a Spring component, so we will annotate it with @Component annotation. Then, we the consumer will need the component, so we will inject this dependency using a constructor.

@Component
public class MantaEventsKafkaConsumer {
    // ...

    private final EmailSender emailSender;

    @Autowired
    public MantaEventsKafkaConsumer(EmailSender emailSender) {
        this.emailSender = emailSender;
    }

    // ...
}

And finally, we will implement the consumer method. This method is called for each message received by Spring framework. Most importantly, the consumer method is annotated with @KafkaListener annotation, which requires the names of the topics that we want to subscribe to. In our case, we will only subscribe to manta-events.

@Component
public class MantaEventsKafkaConsumer {
    // ...

    @KafkaListener(topics = "manta-events")
    public void processEvent(String event) throws JsonProcessingException {
        MantaEvent mantaEvent = objectMapper.readValue(event, MantaEvent.class);
        emailSender.sendEventEmail(mantaEvent);
    }

    // ...
}

For further configuration of @KafkaListener, consult the documentation. The method will map the incoming JSON to an object and then pass it to the email component.

It is also possible to implement more advanced filtration based on the different attributes of the event.

if (mantaEvent.getName().equals("WORKFLOW_EXECUTION_FINISHED_EVENT")) {
    emailSender.sendEventEmail(mantaEvent);
}
if (mantaEvent.getPriority().equals("HIGH")) {
    emailSender.sendEventEmail(mantaEvent);
}
if (mantaEvent.getType().equals("ISSUE")) {
    emailSender.sendEventEmail(mantaEvent);
}

The above list of examples is not exhaustive. Consult Manta Event System and Events for further options.

d. Adding Application Properties

In this Spring Boot application, we will require the following properties for the consumer to be fully functional. Add an application.properties file to the resources folder, and add the following properties there.

With the above setup, it is now possible to run the program and receive emails according to the implemented filtration.

Full Project

The attached ZIP file contains a Maven project that implements all the concepts discussed in this guide as a fully working Spring Boot application. There are TODO messages for fields that can only be filled out in the context of the local environment that the demo is run in.

spring-boot-kafka-example.zip