How to Create an Apache Kafka Consumer for Manta Events That Will Send a Notification Email Using Native API
This page uses concepts discussed in Manta Event System and Connector: Apache Kafka.
If possible, it is recommended to use the Spring Boot API, which provides additional functionalities and is easier to set up. The guide for setting up a Spring Boot project can be found here.
Prerequisites
This guide assumes that the following infrastructure is present before this guide is followed.
-
Apache Kafka cluster with at least one server running on
localhost:9092
-
Topic called
manta-events
, defined in the cluster -
A local email server or access to one and two email addresses
Setting Up Manta Event System
-
Navigate to the configuration page dedicated to the Apache Kafka connector: Connections → Add connection → MANTA Event System → Kafka Events Plugin.
-
Enter the following configurations.
-
Bootstrap Servers -
localhost:9092
-
Topic Name -
manta-events
-
-
Click the Save button.
-
Restart the Manta Service Utility instance to make the changes valid (the connector is initialized while Manta Service Utility is starting.)
Implementing the Event System Consumer in Java
Use the following libraries to implement the Event System Consumer in Java:
-
-
jackson-annotations
: https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -
jackson-databind
: https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind
-
a. Implementing the Model
Manta Events are always delivered in JSON
payloads in the format defined in Manta Event System | Event-Message-Format. In this instance, we will be using the
properties name
and message
. For more advanced filtration, it is possible to use the static fields or the dynamic public parameters which are defined for each event individually, see Events.
Therefore, our model class will be implemented as follows.
public class MantaEvent {
// FIELDS AS DEFINED IN https://manta-io.atlassian.net/wiki/spaces/MTKB/pages/3155951635/MANTA+Event+System#Event-message-format
private final String name;
private final String message;
private final Date timestamp;
private final String type;
private final String priority;
/**
* Create MANTA Event object. JSON creator constructor used by Jackson mapper for mapping from String JSON payload.
* @param name name of the event
* @param message message of the event
* @param timestamp timestamp of the event generation
* @param type type of the event (see docs for possible values)
* @param priority priority of the vent (see docs for possible values)
*/
@JsonCreator
public MantaEvent(@JsonProperty("name") String name,
@JsonProperty("message") String message,
@JsonProperty("timestamp") Date timestamp,
@JsonProperty("type") String type,
@JsonProperty("priority") String priority) {
this.name = name;
this.message = message;
this.timestamp = timestamp;
this.type = type;
this.priority = priority;
}
// getters...
}
Note the use of the @JsonCreator
and @JsonProperty
annotations from
jackson library, which is a recommended way of processing JSON
payloads in Java. These are used to map the JSON
fields to the class properties.
b. Implementing the Component for Sending an Email
For handling email communication, we will be using the Apache Commons Email library. This library greatly simplifies operations with native JavaMail API, which can still be used if it is not possible to use this one.
This component will consist of a single public method
EmailSender.sendEventEmail(MantaEvent event)
, which will handle sending the email to the defined recipient.
To send an email through a custom application, we need the following information.
-
Email address of the sender
-
Password to the email address of the sender (in this demo, it is expected that the password is hardcoded, but this is not the recommended way of implementing this in production code)
-
Email address of the recipient
-
Host name of the email server
-
SMTP port of the server
In the provided project, these fields are empty with a TODO message for them to be filled with the user’s information as described in this guide.
Once these fields are completed, we will implement the method for sending the actual email.
public class EmailSender {
// ... EMAIL fields
public void sendEventEmail(MantaEvent event) {
try {
HtmlEmail email = new HtmlEmail();
email.setHostName(MAIL_HOST);
email.setSmtpPort(SMTP_PORT);
email.setAuthenticator(new DefaultAuthenticator(FROM_ADDRESS, FROM_ADDRESS_PASSWORD));
email.setSSL(true);
email.setFrom(FROM_ADDRESS);
email.addTo(TO_ADDRESS);
email.setSubject(EMAIL_SUBJECT);
email.setHtmlMsg(event.getMessage()); // Set the message body equal to the event message
email.send();
} catch (EmailException e) {
System.out.println("Failed to send the email.");
e.printStackTrace();
}
}
}
In the preceding snippet, we built the email object with our parameters, set the predefined subject, and set the message body of the email equal to the event message.
c. Implementing the Apache Kafka Consumer
To implement the Apache Kafka consumer, we will be using the native Apache Kafka Clients library. Using this library, it is only possible to block consumers. (If the consumer should run always, it should run in an infinite loop.) For an asynchronous message consumption, use the Spring Kafka library in a Spring Boot application.
To implement the consumer, we need the following information.
-
Name of the event system topic—from the prerequisites, we are using a topic called
manta-events
-
Bootstrap server—from the prerequisites, we have a Kafka broker running in
localhost:9092
First, we are going to define the native consumer. We will implement this using a static factory method.
public class MantaEventsKafkaConsumer {
private static final String EVENT_SYSTEM_TOPIC = "manta-events";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static MantaEventsKafkaConsumer createKafkaConsumer(EmailSender emailSender) {
// Configure the consumer
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "MANTA-EVENT-SYSTEM-CONSUMERS");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Create the consumer
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
// Subscribe to the Event System topic
consumer.subscribe(Collections.singletonList(EVENT_SYSTEM_TOPIC));
return new MantaEventsKafkaConsumer(consumer, emailSender);
}
// ...
}
In the preceding snippet, we have first configured the consumer.
-
Configure the bootstrap servers.
-
Set the group ID of the consumer group that the consumer is running in.
-
Set the key deserializer to
StringDeserializer
since the key in all Manta Events is a String. -
Set the value deserializer to
StringDeserializer
since the value in all Manta Events is a JSON in a String.
Then, we created the native Kafka consumer and subscribed to the
manta-events
topic.
Lastly, we will implement the method that the consumer will use to consume and handle the incoming messages. As stated before, in order for a consumer to run at all times, it must run in an infinite loop of sorts. We will be using the poll) method of the Kafka consumer with the timeout of 1 second. If no messages are received in this time, we will poll again. If messages are received, the consumer will loop through them, map each message value (JSON payload of the Manta Event) to an object and pass it to the event sender component.
public class MantaEventsKafkaConsumer {
// ...
public void run() {
while (true) {
// Poll for a second
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(SECONDS.getDuration());
// If no records received, continue
if (consumerRecords.count() == 0) {
continue;
}
// For each received event, map it to an object and send the email
consumerRecords.forEach(event -> {
MantaEvent mantaEvent = null;
try {
mantaEvent = objectMapper.readValue(event.value(), MantaEvent.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
return;
}
emailSender.sendEventEmail(mantaEvent);
});
}
}
}
Using this method, we can implement more advanced filtration. For example, we can filter:
- by event name
if (mantaEvent.getName().equals("WORKFLOW_EXECUTION_FINISHED_EVENT")) {
emailSender.sendEventEmail(mantaEvent);
}
- by event priority
if (mantaEvent.getPriority().equals("HIGH")) {
emailSender.sendEventEmail(mantaEvent);
}
- or by event type
if (mantaEvent.getType().equals("ISSUE")) {
emailSender.sendEventEmail(mantaEvent);
}
Note that the list of preceding examples is not exhaustive. For further filtration options, consult the documentation at Manta Event System | Event-Message-Format and Events.
With this setup, it is now possible to run the program and receive emails according to the implemented filtration.
d. Full Project
The attached compressed file contains a Maven project that implements all the concepts discussed in this guide. There are TODO
messages for the fields that can only be filled out in the context of the local environment that the demo
is run in.
Implementing the Event System Consumer in Python
Dependencies: Confluent-kafka
To be installed by: python -m pip install confluent-kafka
a. config.py
config = {
"broker": "localhost:9092",
"topic": "manta-events",
"group_id": "Mailer",
"event": "WORKFLOW_EXECUTION_FINISHED_EVENT",
"offset": "largest",
"commit": "false",
"poll": "86400000",
"port": 25,
"smtp_server": "",
"sender": "",
"password": "",
"receivers": ["a@a.com","b@a.com"],
"subject": "Manta Workflow Execution Status"
}
b. consumer.sh
#!/usr/bin/env python3
from confluent_kafka import Consumer
import json
import ssl
import smtplib
import datetime
from email.mime.text import MIMEText
from config import config
class MantaConsumer:
def ts2str(self, ts):
return datetime.datetime.fromtimestamp(ts / 1e3).strftime('%Y-%m-%d %H:%M:%S')
def sendmail(self, body):
message = MIMEText(body)
message['From'] = config['sender']
message['To'] = config['receivers']
message['Subject'] = config['subject']
with smtplib.SMTP(config['smtp_server'], config['port']) as server:
server.login(config['sender'], config['password'])
server.sendmail(config['sender'], config['receivers'], message)
def start_listener(self):
consumer_config = {
'bootstrap.servers': config['broker'],
'group.id': config['group_id'],
'auto.offset.reset': config['offset'],
'enable.auto.commit': config['commit'],
'max.poll.interval.ms': config['poll']
}
consumer = Consumer(consumer_config)
consumer.subscribe([config['topic']])
try:
while True:
msg = consumer.poll(timeout=10.0)
if msg is None:
continue
if msg.error():
print('Error reading message : {}'.format(msg.error()))
continue
message = format(msg.value().decode('utf-8'))
m = json.loads(message)
if m['name'] == config['event']:
m['timestamp'] = self.ts2str(m['timestamp'])
m['publicParameters']['executionEnd'] = self.ts2str(
m['publicParameters']['executionEnd'])
m['publicParameters']['addedToWaitingQueue'] = self.ts2str(
m['publicParameters']['addedToWaitingQueue'])
m['publicParameters']['executionStart'] = self.ts2str(
m['publicParameters']['executionStart'])
#print(json.dumps(m, indent=4, sort_keys=False))
self.sendmail(json.dumps(m, indent=4, sort_keys=False))
consumer.commit()
except Exception as ex:
print("Kafka exception : {}", ex)
finally:
print("Closing consumer")
consumer.close()
mc = MantaConsumer()
mc.start_listener()
c. SystemD unit file
[Unit]
Description=Manta Event Mail Service
Requires=kafka.service
After=kafka.service
[Service]
Type=simple
ExecStart=</path-to-consumer.sh>
Restart=on-abnormal
[Install]
WantedBy=multi-user.target