September 19, 2022 By Lily Wang 7 min read

How to use IBM Event Streams and Kafka Streams to aggregate values in a customer rewards system.

Event-driven architecture is becoming more and more popular with big data and cloud environments. Built on open-source Apache Kafka, IBM Event Streams is an event-streaming platform that helps you build smart apps that can react to events as they happen. It supports Kafka Streams, which is a stream processing library for building applications and microservices that process, analyze and store data in Kafka clusters.

This tutorial demonstrates how to implement a customer rewards system based on expenses using IBM Event Streams and Kafka Streams. The system will use customer transaction expenses as input, convert the expenses to reward points, then record the accumulated points to Event Streams topics on a daily and monthly basis. The sample producer and stream projects will be written as a Spring Boot application using Kotlin.

To implement the system, we will walk you through the following tasks:

  1. Set up an IBM Cloud Event Streams instance.
  2. Produce customer expense data to the topic of Event Streams instance.
  3. Set up and implement a Kafka Streams aggregation function and save the output to another topic of the Event Streams instance.
  4. Verify the result in new topics using a consumer.


To run the sample project, you need to install JDK 1.18 on your system and set the JAVA_HOME environment variable pointing to your JDK installation.

You also need to install Maven on your local machine (follow this installation guide). Then set the M2_HOME environment variable pointing to your Maven directory.

Set up your Event Streams Service in IBM Cloud

  1. Log in to IBM Cloud console, click the Event Streams service in the Catalog to create an instance in your account, select Standard Plan for this sample system.
  2. After an Event Streams instance has been provisioned, navigate to the dashboard for the instance and select Topics from the left navigation. Click Create topic and create a new topic with the following configuration:
    • Topic name: expenses; Partitions: 1; Message retention: 7 days
  3. Repeat Step 2 and create three more topics:
    • Topic name: daily_points; Partitions: 1; Message retention: 30 days
    • Topic name: monthly_points; Partitions: 1; Message retention: 30 days
    • Topic name: total_points; Partitions: 1; Message retention: 30 days
  4. Select Service credentials from the left navigation and create a new credential by clicking New credential. You need to grant the credential Manager role because the Kafka Streams application will use the credential to create new topics when running later:

    Click View credentials and record the values of “apikey” and “kafka_brokers_sasl”, which will be used in the properties file later.

Produce sample messages to Event Streams

The sample producer project is a Spring Boot application. To run it, you need to download the source code from the GitHub kafka-rewards-producer repository.

The main configuration file for the application is src/main/resources/ You must replace <kafka_brokers_sasl> using the value of “kafka_brokers_sasl” from the Service credentials created earlier and replace the <apikey> using the value of “apikey”. Below is a sample of the properties file:

#Transaction CSV File location

#Bootstrap server for kafka

#kafka topic to publish message

#kafka cluster security configuration
kafka.sasl.mechanism=PLAIN required username="token" password="<apikey>";

You may notice that in the configuration file there’s an entry of “transaction.file.location” that points to transaction.csv.

In this sample system, we assume a transaction message with the following fields will be created when a customer has an expense:

  • TransactionId
  • Date
  • AccountId
  • Amount

To simplify the process, we use transaction.csv to store multiple transaction records. The producer application will send every record to the Event Streams expenses topic as a message.

You can run the producer application via the commands in a terminal:

git clone
cd kafka-rewards-producer
./mvnw spring-boot:run

If the application runs properly, you will see logs like the ones below, which indicate messages have been published to the Kafka topic:

2022-08-08 23:24:36.076  INFO 49064 --- [           main] c.l.r.producer.TransactionProducer       : Send transaction Transaction(transactionId=2, date=2022-01-03T14:13, accountId=1111, amount=12.11) to kafka
2022-08-08 23:24:36.082  INFO 49064 --- [           main] c.l.r.producer.TransactionProducer       : Send transaction Transaction(transactionId=3, date=2022-01-04T15:13, accountId=1111, amount=33.22) to kafka
2022-08-08 23:24:36.092  INFO 49064 --- [           main] c.l.r.producer.TransactionProducer       : Send transaction Transaction(transactionId=4, date=2022-01-05T11:13, accountId=1111, amount=25.00) to kafka

Set up and implement Kafka Stream

The logic for a daily rewards summary and monthly rewards summary is the core for this project. We created two stream processors to implement daily aggregation and monthly aggregation using spring-kafka and kafka-streams libraries. The source code is available in the Github kafka-rewards-stream repository. It will consume the message from the input topic “expenses”, aggregate the amount in the message using the specified keys and save the aggregated value to the output topics:

Create Kafka Stream builder

The main configuration file for the application is src/main/resources/ You must replace <kafka_brokers_sasl> using the value of “kafka_brokers_sasl” from the Service credentials created in your IBM Event Streams instance and replace the <apikey> using the value of “apikey”.

The class src/main/kotlin/com/lilywang/kafkarewardsstream/config/KafkaConfig.kt loads the configuration from, then creates two identical stream builders for daily aggregation and monthly aggregation. Below is the sample code of creating dailyRewardsKafkaStreamsBuilder:

    fun dailyRewardsStreamConfig(kafkaConfigProperties: Properties): KafkaStreamsConfiguration
        val props: MutableMap<String, Any> = HashMap()
        props[APPLICATION_ID_CONFIG] = "daily-rewards-stream"
        props.putAll(kafkaConfigProperties.toMap() as Map<String, String>)
        return KafkaStreamsConfiguration(props)

fun dailyRewardsKafkaStreamsBuilder(
        dailyRewardsStreamConfig: KafkaStreamsConfiguration,
        customizerProvider: ObjectProvider<StreamsBuilderFactoryBeanCustomizer>,
        configurerProvider: ObjectProvider<StreamsBuilderFactoryBeanConfigurer>
    ): StreamsBuilderFactoryBean {

        val fb = StreamsBuilderFactoryBean(dailyRewardsStreamConfig)
        val configuredBy: MutableSet<StreamsBuilderFactoryBeanConfigurer> = HashSet<StreamsBuilderFactoryBeanConfigurer>()
        configurerProvider.orderedStream().forEach { configurer: StreamsBuilderFactoryBeanConfigurer ->
        val customizer = customizerProvider.ifUnique
        if (customizer != null && !configuredBy.contains(customizer)) {
        return fb

Implement Kafka Streams aggregation

To calculate the daily reward points, the class src/main/kotlin/com/lilywang/kafkarewardsstream/processor/DailyRewardPointAggregator.kt will consume the incoming transactions stored in the “expenses” topic, aggregate the expense amount using key “AccountId#Date” and save the aggregated value to “daily_points” Event Streams topic.

The core function of the code is below:

fun buildPipeline(dailyRewardsKafkaStreamsBuilder: StreamsBuilder) {
        //Consume from topic expense
        val transactionStream: KStream<String, Transaction> = dailyRewardsKafkaStreamsBuilder
            .stream("expenses", Consumed.with(STRING_SERDE, customSerdes.transactionSerde()))

        //Aggregate to DailyRewards by key AccountId#Date
        val dailyRewards: KTable<String, DailyRewards> = transactionStream
            .map { accountId, transaction ->
                        accountId = transaction.accountId,
                        date =,
                        rewardsPoint = transaction.amount.toLong()
            .groupByKey(Grouped.with(Serdes.String(), customSerdes.dailyRewardsSerde()))
                Materialized.with(STRING_SERDE, customSerdes.dailyRewardsSerde())

        //publish result to topic daily_feed
        dailyRewards.toStream().to("daily_points ", Produced.with(Serdes.String(), customSerdes.dailyRewardsSerde()))

The monthly aggregation class src/main/kotlin/com/lilywang/kafkarewardsstream/processor/MonthlyRewardPointAggregator.kt has the same logic as the daily aggregation, except the aggregation key is different — it uses “AccountId#YearMonth”:

        //Aggregate to MonthlyRewards by key AccountId#YearMonth
        val monthlyRewards: KTable<String, MonthlyRewards> = transactionStream
            .map { accountId, transaction ->
                    "$accountId#${{YearMonth.of(it.year, it.monthValue).toString()}}",
                        accountId = transaction.accountId,
                        month = YearMonth.of(,,
                        rewardsPoint = transaction.amount.toLong()
            .groupByKey(Grouped.with(Serdes.String(), customSerdes.monthlyRewardsSerde()))
                Materialized.with(STRING_SERDE, customSerdes.monthlyRewardsSerde())

Run the Kafka Streams application

You can run the stream application via the following commands in a terminal:

git clone
cd kafka-rewards-stream
./mvnw spring-boot:run

If the application runs correctly, you will see logs like these:

2022-08-23 15:42:30.511  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-01 for Month 2022-01: Calculate 33 + 23 = 56
2022-08-23 15:42:30.512  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-01 for Month 2022-01: Calculate 25 + 56 = 81
2022-08-23 15:42:30.512  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-02 for Month 2022-02: Calculate 17 + 0 = 17
2022-08-23 15:42:30.512  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-02 for Month 2022-02: Calculate 23 + 17 = 40
2022-08-23 15:42:30.736  INFO 67643 --- [-StreamThread-1] c.l.k.p.DailyRewardPointAggregator       : AccountId=1111#2022-01-03 for Date 2022-01-03: Calculate 12 + 0 = 12
2022-08-23 15:42:30.737  INFO 67643 --- [-StreamThread-1] c.l.k.p.DailyRewardPointAggregator       : AccountId=1111#2022-01-04 for Date 2022-01-04: Calculate 33 + 0 = 33
2022-08-23 15:42:30.738  INFO 67643 --- [-StreamThread-1] c.l.k.p.DailyRewardPointAggregator       : AccountId=1111#2022-01-05 for Date 2022-01-05: Calculate 25 + 0 = 25

View the result using a Kafka Consumer

A simple way to test and view the results is by using a Kafka Consumer to read the messages generated in the daily_points and monthly_points topics. We use kcat (formerly kafkacat) utility in this tutorial.

  1. Install kcat by command:
    brew install kcat
  2. Configure kcat by create a file “~/.config/kcat.conf” with the lines:
  3. View the result in output topics via these commands:
    kcat -C -t daily_points                   
    kcat -C -t monthly_points

    The output will look like this:

  4. To show the kafka-rewards-stream application continuously aggregate the new incoming transaction, you can edit the file in the kafka-rewards-producer application with the line below:

    Then, generate new messages to “expenses” topic by running the following:

    cd kafka-rewards-producer
    ./mvnw spring-boot:run

    While the kafka-rewards-stream application is still running, it will process the new messages from the “expenses” topic continuously and aggregate to the daily_points and monthly_points output topics. A log similar to the one below will be printed:

    2022-08-23 15:43:37.653  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-03 for Month 2022-03: Calculate 54 + 143 = 197
    2022-08-23 15:43:37.654  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-04 for Month 2022-04: Calculate 26 + 0 = 26
    2022-08-23 15:43:37.654  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=2111#2022-02 for Month 2022-02: Calculate 105 + 43 = 148
    2022-08-23 15:43:37.654  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=2111#2022-03 for Month 2022-03: Calculate 40 + 0 = 40
    2022-08-23 15:43:37.655  INFO 67643 --- [-StreamThread-1] c.l.k.p.DailyRewardPointAggregator       : AccountId=1111#2022-04-17 for Date 2022-04-17: Calculate 26 + 0 = 26
    2022-08-23 15:43:37.655  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=3111#2022-02 for Month 2022-02: Calculate 32 + 0 = 32

    Using kcat utility to check the messages in output topics, you will see new messages added with re-calculated values:

What’s next?

In this tutorial, we use kcat to verify the messages generated by Kafka Streams. Furthermore, you can use a Kafka Connect sink connector to copy data from the topic into a JDBC database or a bucket of IBM Cloud Object Storage to save the data permanently.

You may refer to the Using Kafka Connect with Event Streams documentation to learn how to set up Kafka Connect and then follow the existing IBM resources to setup a sink connector:

You may also refer to the following documents to implement your own Kafka Streams with IBM Event Streams:

Was this article helpful?

More from Cloud

Enhance your data security posture with a no-code approach to application-level encryption

4 min read - Data is the lifeblood of every organization. As your organization’s data footprint expands across the clouds and between your own business lines to drive value, it is essential to secure data at all stages of the cloud adoption and throughout the data lifecycle. While there are different mechanisms available to encrypt data throughout its lifecycle (in transit, at rest and in use), application-level encryption (ALE) provides an additional layer of protection by encrypting data at its source. ALE can enhance…

Attention new clients: exciting financial incentives for VMware Cloud Foundation on IBM Cloud

4 min read - New client specials: Get up to 50% off when you commit to a 1- or 3-year term contract on new VCF-as-a-Service offerings, plus an additional value of up to USD 200K in credits through 30 June 2025 when you migrate your VMware workloads to IBM Cloud®.1 Low starting prices: On-demand VCF-as-a-Service deployments begin under USD 200 per month.2 The IBM Cloud benefit: See the potential for a 201%3 return on investment (ROI) over 3 years with reduced downtime, cost and…

The history of the central processing unit (CPU)

10 min read - The central processing unit (CPU) is the computer’s brain. It handles the assignment and processing of tasks, in addition to functions that make a computer run. There’s no way to overstate the importance of the CPU to computing. Virtually all computer systems contain, at the least, some type of basic CPU. Regardless of whether they’re used in personal computers (PCs), laptops, tablets, smartphones or even in supercomputers whose output is so strong it must be measured in floating-point operations per…

IBM Newsletters

Get our newsletters and topic updates that deliver the latest thought leadership and insights on emerging trends.
Subscribe now More newsletters