September 19, 2022 By Lily Wang 7 min read

How to use IBM Event Streams and Kafka Streams to aggregate values in a customer rewards system.

Event-driven architecture is becoming more and more popular with big data and cloud environments. Built on open-source Apache Kafka, IBM Event Streams is an event-streaming platform that helps you build smart apps that can react to events as they happen. It supports Kafka Streams, which is a stream processing library for building applications and microservices that process, analyze and store data in Kafka clusters.

This tutorial demonstrates how to implement a customer rewards system based on expenses using IBM Event Streams and Kafka Streams. The system will use customer transaction expenses as input, convert the expenses to reward points, then record the accumulated points to Event Streams topics on a daily and monthly basis. The sample producer and stream projects will be written as a Spring Boot application using Kotlin.

To implement the system, we will walk you through the following tasks:

  1. Set up an IBM Cloud Event Streams instance.
  2. Produce customer expense data to the topic of Event Streams instance.
  3. Set up and implement a Kafka Streams aggregation function and save the output to another topic of the Event Streams instance.
  4. Verify the result in new topics using a consumer.

Prerequisites

To run the sample project, you need to install JDK 1.18 on your system and set the JAVA_HOME environment variable pointing to your JDK installation.

You also need to install Maven on your local machine (follow this installation guide). Then set the M2_HOME environment variable pointing to your Maven directory.

Set up your Event Streams Service in IBM Cloud

  1. Log in to IBM Cloud console, click the Event Streams service in the Catalog to create an instance in your account, select Standard Plan for this sample system.
  2. After an Event Streams instance has been provisioned, navigate to the dashboard for the instance and select Topics from the left navigation. Click Create topic and create a new topic with the following configuration:
    • Topic name: expenses; Partitions: 1; Message retention: 7 days
  3. Repeat Step 2 and create three more topics:
    • Topic name: daily_points; Partitions: 1; Message retention: 30 days
    • Topic name: monthly_points; Partitions: 1; Message retention: 30 days
    • Topic name: total_points; Partitions: 1; Message retention: 30 days
  4. Select Service credentials from the left navigation and create a new credential by clicking New credential. You need to grant the credential Manager role because the Kafka Streams application will use the credential to create new topics when running later:

    Click View credentials and record the values of “apikey” and “kafka_brokers_sasl”, which will be used in the properties file later.

Produce sample messages to Event Streams

The sample producer project is a Spring Boot application. To run it, you need to download the source code from the GitHub kafka-rewards-producer repository.

The main configuration file for the application is src/main/resources/application.properties. You must replace <kafka_brokers_sasl> using the value of “kafka_brokers_sasl” from the Service credentials created earlier and replace the <apikey> using the value of “apikey”. Below is a sample of the properties file:

#Transaction CSV File location
transaction.file.location=transaction.csv

#Bootstrap server for kafka
kafka.bootstrap.servers=<kafka_brokers_sasl>

#kafka topic to publish message
kafka.produce.topic=expense

#kafka cluster security configuration
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="<apikey>";

You may notice that in the configuration file there’s an entry of “transaction.file.location” that points to transaction.csv.

In this sample system, we assume a transaction message with the following fields will be created when a customer has an expense:

  • TransactionId
  • Date
  • AccountId
  • Amount

To simplify the process, we use transaction.csv to store multiple transaction records. The producer application will send every record to the Event Streams expenses topic as a message.

You can run the producer application via the commands in a terminal:

git clone https://github.com/liwang2017/kafka-rewards-producer.git
cd kafka-rewards-producer
./mvnw spring-boot:run

If the application runs properly, you will see logs like the ones below, which indicate messages have been published to the Kafka topic:

2022-08-08 23:24:36.076  INFO 49064 --- [           main] c.l.r.producer.TransactionProducer       : Send transaction Transaction(transactionId=2, date=2022-01-03T14:13, accountId=1111, amount=12.11) to kafka
2022-08-08 23:24:36.082  INFO 49064 --- [           main] c.l.r.producer.TransactionProducer       : Send transaction Transaction(transactionId=3, date=2022-01-04T15:13, accountId=1111, amount=33.22) to kafka
2022-08-08 23:24:36.092  INFO 49064 --- [           main] c.l.r.producer.TransactionProducer       : Send transaction Transaction(transactionId=4, date=2022-01-05T11:13, accountId=1111, amount=25.00) to kafka
…

Set up and implement Kafka Stream

The logic for a daily rewards summary and monthly rewards summary is the core for this project. We created two stream processors to implement daily aggregation and monthly aggregation using spring-kafka and kafka-streams libraries. The source code is available in the Github kafka-rewards-stream repository. It will consume the message from the input topic “expenses”, aggregate the amount in the message using the specified keys and save the aggregated value to the output topics:

Create Kafka Stream builder

The main configuration file for the application is src/main/resources/application.properties. You must replace <kafka_brokers_sasl> using the value of “kafka_brokers_sasl” from the Service credentials created in your IBM Event Streams instance and replace the <apikey> using the value of “apikey”.

The class src/main/kotlin/com/lilywang/kafkarewardsstream/config/KafkaConfig.kt loads the configuration from application.properties, then creates two identical stream builders for daily aggregation and monthly aggregation. Below is the sample code of creating dailyRewardsKafkaStreamsBuilder:

    fun dailyRewardsStreamConfig(kafkaConfigProperties: Properties): KafkaStreamsConfiguration
    {
        val props: MutableMap<String, Any> = HashMap()
        props[APPLICATION_ID_CONFIG] = "daily-rewards-stream"
        props.putAll(kafkaConfigProperties.toMap() as Map<String, String>)
        return KafkaStreamsConfiguration(props)
}

fun dailyRewardsKafkaStreamsBuilder(
        dailyRewardsStreamConfig: KafkaStreamsConfiguration,
        customizerProvider: ObjectProvider<StreamsBuilderFactoryBeanCustomizer>,
        configurerProvider: ObjectProvider<StreamsBuilderFactoryBeanConfigurer>
    ): StreamsBuilderFactoryBean {

        val fb = StreamsBuilderFactoryBean(dailyRewardsStreamConfig)
        val configuredBy: MutableSet<StreamsBuilderFactoryBeanConfigurer> = HashSet<StreamsBuilderFactoryBeanConfigurer>()
        configurerProvider.orderedStream().forEach { configurer: StreamsBuilderFactoryBeanConfigurer ->
            configurer.configure(fb)
            configuredBy.add(configurer)
        }
        val customizer = customizerProvider.ifUnique
        if (customizer != null && !configuredBy.contains(customizer)) {
            customizer.configure(fb)
        }
        return fb
    }

Implement Kafka Streams aggregation

To calculate the daily reward points, the class src/main/kotlin/com/lilywang/kafkarewardsstream/processor/DailyRewardPointAggregator.kt will consume the incoming transactions stored in the “expenses” topic, aggregate the expense amount using key “AccountId#Date” and save the aggregated value to “daily_points” Event Streams topic.

The core function of the code is below:

fun buildPipeline(dailyRewardsKafkaStreamsBuilder: StreamsBuilder) {
        //Consume from topic expense
        val transactionStream: KStream<String, Transaction> = dailyRewardsKafkaStreamsBuilder
            .stream("expenses", Consumed.with(STRING_SERDE, customSerdes.transactionSerde()))

        //Aggregate to DailyRewards by key AccountId#Date
        val dailyRewards: KTable<String, DailyRewards> = transactionStream
            .map { accountId, transaction ->
                KeyValue(
                    "$accountId#${transaction.date.toLocalDate()}",
                    DailyRewards(
                        accountId = transaction.accountId,
                        date = transaction.date.toLocalDate(),
                        rewardsPoint = transaction.amount.toLong()
                    )
                )
            }
            .groupByKey(Grouped.with(Serdes.String(), customSerdes.dailyRewardsSerde()))
            .aggregate(
                this::initialize,
                this::aggregateRewards,
                Materialized.with(STRING_SERDE, customSerdes.dailyRewardsSerde())
            )

        //publish result to topic daily_feed
        dailyRewards.toStream().to("daily_points ", Produced.with(Serdes.String(), customSerdes.dailyRewardsSerde()))
    }

The monthly aggregation class src/main/kotlin/com/lilywang/kafkarewardsstream/processor/MonthlyRewardPointAggregator.kt has the same logic as the daily aggregation, except the aggregation key is different — it uses “AccountId#YearMonth”:

        //Aggregate to MonthlyRewards by key AccountId#YearMonth
        val monthlyRewards: KTable<String, MonthlyRewards> = transactionStream
            .map { accountId, transaction ->
                KeyValue(
                    "$accountId#${transaction.date.let{YearMonth.of(it.year, it.monthValue).toString()}}",
                    MonthlyRewards(
                        accountId = transaction.accountId,
                        month = YearMonth.of(transaction.date.year, transaction.date.monthValue),
                        rewardsPoint = transaction.amount.toLong()
                    )
                )
            }
            .groupByKey(Grouped.with(Serdes.String(), customSerdes.monthlyRewardsSerde()))
            .aggregate(
                this::initialize,
                this::aggregateRewards,
                Materialized.with(STRING_SERDE, customSerdes.monthlyRewardsSerde())
            )

Run the Kafka Streams application

You can run the stream application via the following commands in a terminal:

git clone https://github.com/liwang2017/kafka-rewards-stream.git
cd kafka-rewards-stream
./mvnw spring-boot:run

If the application runs correctly, you will see logs like these:

2022-08-23 15:42:30.511  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-01 for Month 2022-01: Calculate 33 + 23 = 56
2022-08-23 15:42:30.512  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-01 for Month 2022-01: Calculate 25 + 56 = 81
2022-08-23 15:42:30.512  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-02 for Month 2022-02: Calculate 17 + 0 = 17
2022-08-23 15:42:30.512  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-02 for Month 2022-02: Calculate 23 + 17 = 40
…
2022-08-23 15:42:30.736  INFO 67643 --- [-StreamThread-1] c.l.k.p.DailyRewardPointAggregator       : AccountId=1111#2022-01-03 for Date 2022-01-03: Calculate 12 + 0 = 12
2022-08-23 15:42:30.737  INFO 67643 --- [-StreamThread-1] c.l.k.p.DailyRewardPointAggregator       : AccountId=1111#2022-01-04 for Date 2022-01-04: Calculate 33 + 0 = 33
2022-08-23 15:42:30.738  INFO 67643 --- [-StreamThread-1] c.l.k.p.DailyRewardPointAggregator       : AccountId=1111#2022-01-05 for Date 2022-01-05: Calculate 25 + 0 = 25
…

View the result using a Kafka Consumer

A simple way to test and view the results is by using a Kafka Consumer to read the messages generated in the daily_points and monthly_points topics. We use kcat (formerly kafkacat) utility in this tutorial.

  1. Install kcat by command:
    brew install kcat
  2. Configure kcat by create a file “~/.config/kcat.conf” with the lines:
    bootstrap.servers=<kafka_brokers_sasl>
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.username=token
    sasl.password=<api-key>
  3. View the result in output topics via these commands:
    kcat -C -t daily_points                   
    kcat -C -t monthly_points

    The output will look like this:

  4. To show the kafka-rewards-stream application continuously aggregate the new incoming transaction, you can edit the application.properties file in the kafka-rewards-producer application with the line below:
    transaction.file.location=transaction2.csv

    Then, generate new messages to “expenses” topic by running the following:

    cd kafka-rewards-producer
    ./mvnw spring-boot:run

    While the kafka-rewards-stream application is still running, it will process the new messages from the “expenses” topic continuously and aggregate to the daily_points and monthly_points output topics. A log similar to the one below will be printed:

    2022-08-23 15:43:37.653  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-03 for Month 2022-03: Calculate 54 + 143 = 197
    2022-08-23 15:43:37.654  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-04 for Month 2022-04: Calculate 26 + 0 = 26
    2022-08-23 15:43:37.654  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=2111#2022-02 for Month 2022-02: Calculate 105 + 43 = 148
    2022-08-23 15:43:37.654  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=2111#2022-03 for Month 2022-03: Calculate 40 + 0 = 40
    2022-08-23 15:43:37.655  INFO 67643 --- [-StreamThread-1] c.l.k.p.DailyRewardPointAggregator       : AccountId=1111#2022-04-17 for Date 2022-04-17: Calculate 26 + 0 = 26
    2022-08-23 15:43:37.655  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=3111#2022-02 for Month 2022-02: Calculate 32 + 0 = 32
    …

    Using kcat utility to check the messages in output topics, you will see new messages added with re-calculated values:

What’s next?

In this tutorial, we use kcat to verify the messages generated by Kafka Streams. Furthermore, you can use a Kafka Connect sink connector to copy data from the topic into a JDBC database or a bucket of IBM Cloud Object Storage to save the data permanently.

You may refer to the Using Kafka Connect with Event Streams documentation to learn how to set up Kafka Connect and then follow the existing IBM resources to setup a sink connector:

You may also refer to the following documents to implement your own Kafka Streams with IBM Event Streams:

Was this article helpful?
YesNo

More from Cloud

Attention new clients: exciting financial incentives for VMware Cloud Foundation on IBM Cloud

4 min read - New client specials: Get up to 50% off when you commit to a 1- or 3-year term contract on new VCF-as-a-Service offerings, plus an additional value of up to USD 200K in credits through 30 June 2025 when you migrate your VMware workloads to IBM Cloud®.1 Low starting prices: On-demand VCF-as-a-Service deployments begin under USD 200 per month.2 The IBM Cloud benefit: See the potential for a 201%3 return on investment (ROI) over 3 years with reduced downtime, cost and…

24 IBM offerings winning TrustRadius 2024 Top Rated Awards

2 min read - TrustRadius is a buyer intelligence platform for business technology. Comprehensive product information, in-depth customer insights and peer conversations enable buyers to make confident decisions. “Earning a Top Rated Award means the vendor has excellent customer satisfaction and proven credibility. It’s based entirely on reviews and customer sentiment,” said Becky Susko, TrustRadius, Marketing Program Manager of Awards. Top Rated Awards have to be earned: Gain 10+ new reviews in the past 12 months Earn a trScore of 7.5 or higher from…

IBM Tech Now: April 8, 2024

< 1 min read - ​Welcome IBM Tech Now, our video web series featuring the latest and greatest news and announcements in the world of technology. Make sure you subscribe to our YouTube channel to be notified every time a new IBM Tech Now video is published. IBM Tech Now: Episode 96 On this episode, we're covering the following topics: IBM Cloud Logs A collaboration with IBM watsonx.ai and Anaconda IBM offerings in the G2 Spring Reports Stay plugged in You can check out the…

IBM Newsletters

Get our newsletters and topic updates that deliver the latest thought leadership and insights on emerging trends.
Subscribe now More newsletters