How to use IBM Event Streams and Kafka Streams to aggregate values in a customer rewards system.
Event-driven architecture is becoming more and more popular with big data and cloud environments. Built on open-source Apache Kafka, IBM Event Streams is an event-streaming platform that helps you build smart apps that can react to events as they happen. It supports Kafka Streams, which is a stream processing library for building applications and microservices that process, analyze and store data in Kafka clusters.
This tutorial demonstrates how to implement a customer rewards system based on expenses using IBM Event Streams and Kafka Streams. The system will use customer transaction expenses as input, convert the expenses to reward points, then record the accumulated points to Event Streams topics on a daily and monthly basis. The sample producer and stream projects will be written as a Spring Boot application using Kotlin.
To implement the system, we will walk you through the following tasks:
- Set up an IBM Cloud Event Streams instance.
- Produce customer expense data to the topic of Event Streams instance.
- Set up and implement a Kafka Streams aggregation function and save the output to another topic of the Event Streams instance.
- Verify the result in new topics using a consumer.
Prerequisites
To run the sample project, you need to install JDK 1.18 on your system and set the JAVA_HOME environment variable pointing to your JDK installation.
You also need to install Maven on your local machine (follow this installation guide). Then set the M2_HOME environment variable pointing to your Maven directory.
Set up your Event Streams Service in IBM Cloud
- Log in to IBM Cloud console, click the Event Streams service in the Catalog to create an instance in your account, select Standard Plan for this sample system.
- After an Event Streams instance has been provisioned, navigate to the dashboard for the instance and select Topics from the left navigation. Click Create topic and create a new topic with the following configuration:
- Topic name: expenses; Partitions: 1; Message retention: 7 days
- Repeat Step 2 and create three more topics:
- Topic name: daily_points; Partitions: 1; Message retention: 30 days
- Topic name: monthly_points; Partitions: 1; Message retention: 30 days
- Topic name: total_points; Partitions: 1; Message retention: 30 days
- Select Service credentials from the left navigation and create a new credential by clicking New credential. You need to grant the credential Manager role because the Kafka Streams application will use the credential to create new topics when running later:
Click View credentials and record the values of “apikey” and “kafka_brokers_sasl”, which will be used in the properties file later.
Produce sample messages to Event Streams
The sample producer project is a Spring Boot application. To run it, you need to download the source code from the GitHub kafka-rewards-producer repository.
The main configuration file for the application is src/main/resources/application.properties. You must replace <kafka_brokers_sasl> using the value of “kafka_brokers_sasl” from the Service credentials created earlier and replace the <apikey> using the value of “apikey”. Below is a sample of the properties file:
You may notice that in the configuration file there’s an entry of “transaction.file.location” that points to transaction.csv.
In this sample system, we assume a transaction message with the following fields will be created when a customer has an expense:
- TransactionId
- Date
- AccountId
- Amount
To simplify the process, we use transaction.csv to store multiple transaction records. The producer application will send every record to the Event Streams expenses topic as a message.
You can run the producer application via the commands in a terminal:
If the application runs properly, you will see logs like the ones below, which indicate messages have been published to the Kafka topic:
Set up and implement Kafka Stream
The logic for a daily rewards summary and monthly rewards summary is the core for this project. We created two stream processors to implement daily aggregation and monthly aggregation using spring-kafka and kafka-streams libraries. The source code is available in the Github kafka-rewards-stream repository. It will consume the message from the input topic “expenses”, aggregate the amount in the message using the specified keys and save the aggregated value to the output topics:
Create Kafka Stream builder
The main configuration file for the application is src/main/resources/application.properties. You must replace <kafka_brokers_sasl> using the value of “kafka_brokers_sasl” from the Service credentials created in your IBM Event Streams instance and replace the <apikey> using the value of “apikey”.
The class src/main/kotlin/com/lilywang/kafkarewardsstream/config/KafkaConfig.kt loads the configuration from application.properties, then creates two identical stream builders for daily aggregation and monthly aggregation. Below is the sample code of creating dailyRewardsKafkaStreamsBuilder:
Implement Kafka Streams aggregation
To calculate the daily reward points, the class src/main/kotlin/com/lilywang/kafkarewardsstream/processor/DailyRewardPointAggregator.kt will consume the incoming transactions stored in the “expenses” topic, aggregate the expense amount using key “AccountId#Date” and save the aggregated value to “daily_points” Event Streams topic.
The core function of the code is below:
The monthly aggregation class src/main/kotlin/com/lilywang/kafkarewardsstream/processor/MonthlyRewardPointAggregator.kt has the same logic as the daily aggregation, except the aggregation key is different — it uses “AccountId#YearMonth”:
Run the Kafka Streams application
You can run the stream application via the following commands in a terminal:
If the application runs correctly, you will see logs like these:
View the result using a Kafka Consumer
A simple way to test and view the results is by using a Kafka Consumer to read the messages generated in the daily_points and monthly_points topics. We use kcat (formerly kafkacat) utility in this tutorial.
- Install kcat by command:
- Configure kcat by create a file “~/.config/kcat.conf” with the lines:
- View the result in output topics via these commands:
The output will look like this:
- To show the kafka-rewards-stream application continuously aggregate the new incoming transaction, you can edit the application.properties file in the kafka-rewards-producer application with the line below:
Then, generate new messages to “expenses” topic by running the following:
While the kafka-rewards-stream application is still running, it will process the new messages from the “expenses” topic continuously and aggregate to the daily_points and monthly_points output topics. A log similar to the one below will be printed:
Using kcat utility to check the messages in output topics, you will see new messages added with re-calculated values:
What’s next?
In this tutorial, we use kcat to verify the messages generated by Kafka Streams. Furthermore, you can use a Kafka Connect sink connector to copy data from the topic into a JDBC database or a bucket of IBM Cloud Object Storage to save the data permanently.
You may refer to the Using Kafka Connect with Event Streams documentation to learn how to set up Kafka Connect and then follow the existing IBM resources to setup a sink connector:
You may also refer to the following documents to implement your own Kafka Streams with IBM Event Streams: