How to use IBM Event Streams and Kafka Streams to aggregate values in a customer rewards system.

Event-driven architecture is becoming more and more popular with big data and cloud environments. Built on open-source Apache Kafka, IBM Event Streams is an event-streaming platform that helps you build smart apps that can react to events as they happen. It supports Kafka Streams, which is a stream processing library for building applications and microservices that process, analyze and store data in Kafka clusters.

This tutorial demonstrates how to implement a customer rewards system based on expenses using IBM Event Streams and Kafka Streams. The system will use customer transaction expenses as input, convert the expenses to reward points, then record the accumulated points to Event Streams topics on a daily and monthly basis. The sample producer and stream projects will be written as a Spring Boot application using Kotlin.

To implement the system, we will walk you through the following tasks:

  1. Set up an IBM Cloud Event Streams instance.
  2. Produce customer expense data to the topic of Event Streams instance.
  3. Set up and implement a Kafka Streams aggregation function and save the output to another topic of the Event Streams instance.
  4. Verify the result in new topics using a consumer.

Prerequisites

To run the sample project, you need to install JDK 1.18 on your system and set the JAVA_HOME environment variable pointing to your JDK installation.

You also need to install Maven on your local machine (follow this installation guide). Then set the M2_HOME environment variable pointing to your Maven directory.

Set up your Event Streams Service in IBM Cloud

  1. Log in to IBM Cloud console, click the Event Streams service in the Catalog to create an instance in your account, select Standard Plan for this sample system.
  2. After an Event Streams instance has been provisioned, navigate to the dashboard for the instance and select Topics from the left navigation. Click Create topic and create a new topic with the following configuration:
    • Topic name: expenses; Partitions: 1; Message retention: 7 days
  3. Repeat Step 2 and create three more topics:
    • Topic name: daily_points; Partitions: 1; Message retention: 30 days
    • Topic name: monthly_points; Partitions: 1; Message retention: 30 days
    • Topic name: total_points; Partitions: 1; Message retention: 30 days
  4. Select Service credentials from the left navigation and create a new credential by clicking New credential. You need to grant the credential Manager role because the Kafka Streams application will use the credential to create new topics when running later:

    Click View credentials and record the values of “apikey” and “kafka_brokers_sasl”, which will be used in the properties file later.

Produce sample messages to Event Streams

The sample producer project is a Spring Boot application. To run it, you need to download the source code from the GitHub kafka-rewards-producer repository.

The main configuration file for the application is src/main/resources/application.properties. You must replace <kafka_brokers_sasl> using the value of “kafka_brokers_sasl” from the Service credentials created earlier and replace the <apikey> using the value of “apikey”. Below is a sample of the properties file:

#Transaction CSV File location
transaction.file.location=transaction.csv

#Bootstrap server for kafka
kafka.bootstrap.servers=<kafka_brokers_sasl>

#kafka topic to publish message
kafka.produce.topic=expense

#kafka cluster security configuration
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="<apikey>";

You may notice that in the configuration file there’s an entry of “transaction.file.location” that points to transaction.csv.

In this sample system, we assume a transaction message with the following fields will be created when a customer has an expense:

  • TransactionId
  • Date
  • AccountId
  • Amount

To simplify the process, we use transaction.csv to store multiple transaction records. The producer application will send every record to the Event Streams expenses topic as a message.

You can run the producer application via the commands in a terminal:

git clone https://github.com/liwang2017/kafka-rewards-producer.git
cd kafka-rewards-producer
./mvnw spring-boot:run

If the application runs properly, you will see logs like the ones below, which indicate messages have been published to the Kafka topic:

2022-08-08 23:24:36.076  INFO 49064 --- [           main] c.l.r.producer.TransactionProducer       : Send transaction Transaction(transactionId=2, date=2022-01-03T14:13, accountId=1111, amount=12.11) to kafka
2022-08-08 23:24:36.082  INFO 49064 --- [           main] c.l.r.producer.TransactionProducer       : Send transaction Transaction(transactionId=3, date=2022-01-04T15:13, accountId=1111, amount=33.22) to kafka
2022-08-08 23:24:36.092  INFO 49064 --- [           main] c.l.r.producer.TransactionProducer       : Send transaction Transaction(transactionId=4, date=2022-01-05T11:13, accountId=1111, amount=25.00) to kafka
…

Set up and implement Kafka Stream

The logic for a daily rewards summary and monthly rewards summary is the core for this project. We created two stream processors to implement daily aggregation and monthly aggregation using spring-kafka and kafka-streams libraries. The source code is available in the Github kafka-rewards-stream repository. It will consume the message from the input topic “expenses”, aggregate the amount in the message using the specified keys and save the aggregated value to the output topics:

Create Kafka Stream builder

The main configuration file for the application is src/main/resources/application.properties. You must replace <kafka_brokers_sasl> using the value of “kafka_brokers_sasl” from the Service credentials created in your IBM Event Streams instance and replace the <apikey> using the value of “apikey”.

The class src/main/kotlin/com/lilywang/kafkarewardsstream/config/KafkaConfig.kt loads the configuration from application.properties, then creates two identical stream builders for daily aggregation and monthly aggregation. Below is the sample code of creating dailyRewardsKafkaStreamsBuilder:

    fun dailyRewardsStreamConfig(kafkaConfigProperties: Properties): KafkaStreamsConfiguration
    {
        val props: MutableMap<String, Any> = HashMap()
        props[APPLICATION_ID_CONFIG] = "daily-rewards-stream"
        props.putAll(kafkaConfigProperties.toMap() as Map<String, String>)
        return KafkaStreamsConfiguration(props)
}

fun dailyRewardsKafkaStreamsBuilder(
        dailyRewardsStreamConfig: KafkaStreamsConfiguration,
        customizerProvider: ObjectProvider<StreamsBuilderFactoryBeanCustomizer>,
        configurerProvider: ObjectProvider<StreamsBuilderFactoryBeanConfigurer>
    ): StreamsBuilderFactoryBean {

        val fb = StreamsBuilderFactoryBean(dailyRewardsStreamConfig)
        val configuredBy: MutableSet<StreamsBuilderFactoryBeanConfigurer> = HashSet<StreamsBuilderFactoryBeanConfigurer>()
        configurerProvider.orderedStream().forEach { configurer: StreamsBuilderFactoryBeanConfigurer ->
            configurer.configure(fb)
            configuredBy.add(configurer)
        }
        val customizer = customizerProvider.ifUnique
        if (customizer != null && !configuredBy.contains(customizer)) {
            customizer.configure(fb)
        }
        return fb
    }

Implement Kafka Streams aggregation

To calculate the daily reward points, the class src/main/kotlin/com/lilywang/kafkarewardsstream/processor/DailyRewardPointAggregator.kt will consume the incoming transactions stored in the “expenses” topic, aggregate the expense amount using key “AccountId#Date” and save the aggregated value to “daily_points” Event Streams topic.

The core function of the code is below:

fun buildPipeline(dailyRewardsKafkaStreamsBuilder: StreamsBuilder) {
        //Consume from topic expense
        val transactionStream: KStream<String, Transaction> = dailyRewardsKafkaStreamsBuilder
            .stream("expenses", Consumed.with(STRING_SERDE, customSerdes.transactionSerde()))

        //Aggregate to DailyRewards by key AccountId#Date
        val dailyRewards: KTable<String, DailyRewards> = transactionStream
            .map { accountId, transaction ->
                KeyValue(
                    "$accountId#${transaction.date.toLocalDate()}",
                    DailyRewards(
                        accountId = transaction.accountId,
                        date = transaction.date.toLocalDate(),
                        rewardsPoint = transaction.amount.toLong()
                    )
                )
            }
            .groupByKey(Grouped.with(Serdes.String(), customSerdes.dailyRewardsSerde()))
            .aggregate(
                this::initialize,
                this::aggregateRewards,
                Materialized.with(STRING_SERDE, customSerdes.dailyRewardsSerde())
            )

        //publish result to topic daily_feed
        dailyRewards.toStream().to("daily_points ", Produced.with(Serdes.String(), customSerdes.dailyRewardsSerde()))
    }

The monthly aggregation class src/main/kotlin/com/lilywang/kafkarewardsstream/processor/MonthlyRewardPointAggregator.kt has the same logic as the daily aggregation, except the aggregation key is different — it uses “AccountId#YearMonth”:

        //Aggregate to MonthlyRewards by key AccountId#YearMonth
        val monthlyRewards: KTable<String, MonthlyRewards> = transactionStream
            .map { accountId, transaction ->
                KeyValue(
                    "$accountId#${transaction.date.let{YearMonth.of(it.year, it.monthValue).toString()}}",
                    MonthlyRewards(
                        accountId = transaction.accountId,
                        month = YearMonth.of(transaction.date.year, transaction.date.monthValue),
                        rewardsPoint = transaction.amount.toLong()
                    )
                )
            }
            .groupByKey(Grouped.with(Serdes.String(), customSerdes.monthlyRewardsSerde()))
            .aggregate(
                this::initialize,
                this::aggregateRewards,
                Materialized.with(STRING_SERDE, customSerdes.monthlyRewardsSerde())
            )

Run the Kafka Streams application

You can run the stream application via the following commands in a terminal:

git clone https://github.com/liwang2017/kafka-rewards-stream.git
cd kafka-rewards-stream
./mvnw spring-boot:run

If the application runs correctly, you will see logs like these:

2022-08-23 15:42:30.511  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-01 for Month 2022-01: Calculate 33 + 23 = 56
2022-08-23 15:42:30.512  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-01 for Month 2022-01: Calculate 25 + 56 = 81
2022-08-23 15:42:30.512  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-02 for Month 2022-02: Calculate 17 + 0 = 17
2022-08-23 15:42:30.512  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-02 for Month 2022-02: Calculate 23 + 17 = 40
…
2022-08-23 15:42:30.736  INFO 67643 --- [-StreamThread-1] c.l.k.p.DailyRewardPointAggregator       : AccountId=1111#2022-01-03 for Date 2022-01-03: Calculate 12 + 0 = 12
2022-08-23 15:42:30.737  INFO 67643 --- [-StreamThread-1] c.l.k.p.DailyRewardPointAggregator       : AccountId=1111#2022-01-04 for Date 2022-01-04: Calculate 33 + 0 = 33
2022-08-23 15:42:30.738  INFO 67643 --- [-StreamThread-1] c.l.k.p.DailyRewardPointAggregator       : AccountId=1111#2022-01-05 for Date 2022-01-05: Calculate 25 + 0 = 25
…

View the result using a Kafka Consumer

A simple way to test and view the results is by using a Kafka Consumer to read the messages generated in the daily_points and monthly_points topics. We use kcat (formerly kafkacat) utility in this tutorial.

  1. Install kcat by command:
    brew install kcat
  2. Configure kcat by create a file “~/.config/kcat.conf” with the lines:
    bootstrap.servers=<kafka_brokers_sasl>
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.username=token
    sasl.password=<api-key>
  3. View the result in output topics via these commands:
    kcat -C -t daily_points                   
    kcat -C -t monthly_points

    The output will look like this:

  4. To show the kafka-rewards-stream application continuously aggregate the new incoming transaction, you can edit the application.properties file in the kafka-rewards-producer application with the line below:
    transaction.file.location=transaction2.csv

    Then, generate new messages to “expenses” topic by running the following:

    cd kafka-rewards-producer
    ./mvnw spring-boot:run

    While the kafka-rewards-stream application is still running, it will process the new messages from the “expenses” topic continuously and aggregate to the daily_points and monthly_points output topics. A log similar to the one below will be printed:

    2022-08-23 15:43:37.653  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-03 for Month 2022-03: Calculate 54 + 143 = 197
    2022-08-23 15:43:37.654  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=1111#2022-04 for Month 2022-04: Calculate 26 + 0 = 26
    2022-08-23 15:43:37.654  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=2111#2022-02 for Month 2022-02: Calculate 105 + 43 = 148
    2022-08-23 15:43:37.654  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=2111#2022-03 for Month 2022-03: Calculate 40 + 0 = 40
    2022-08-23 15:43:37.655  INFO 67643 --- [-StreamThread-1] c.l.k.p.DailyRewardPointAggregator       : AccountId=1111#2022-04-17 for Date 2022-04-17: Calculate 26 + 0 = 26
    2022-08-23 15:43:37.655  INFO 67643 --- [-StreamThread-1] c.l.k.p.MonthlyRewardPointAggregator     : AccountId=3111#2022-02 for Month 2022-02: Calculate 32 + 0 = 32
    …

    Using kcat utility to check the messages in output topics, you will see new messages added with re-calculated values:

What’s next?

In this tutorial, we use kcat to verify the messages generated by Kafka Streams. Furthermore, you can use a Kafka Connect sink connector to copy data from the topic into a JDBC database or a bucket of IBM Cloud Object Storage to save the data permanently.

You may refer to the Using Kafka Connect with Event Streams documentation to learn how to set up Kafka Connect and then follow the existing IBM resources to setup a sink connector:

You may also refer to the following documents to implement your own Kafka Streams with IBM Event Streams:

Categories

More from Cloud

IBM Cloud inactive identities: Ideas for automated processing

4 min read - Regular cleanup is part of all account administration and security best practices, not just for cloud environments. In our blog post on identifying inactive identities, we looked at the APIs offered by IBM Cloud Identity and Access Management (IAM) and how to utilize them to obtain details on IAM identities and API keys. Some readers provided feedback and asked on how to proceed and act on identified inactive identities. In response, we are going lay out possible steps to take.…

IBM Cloud VMware as a Service introduces multitenant as a new, cost-efficient consumption model

4 min read - Businesses often struggle with ongoing operational needs like monitoring, patching and maintenance of their VMware infrastructure or the added concerns over capacity management. At the same time, cost efficiency and control are very important. Not all workloads have identical needs and different business applications have variable requirements. For example, production applications and regulated workloads may require strong isolation, but development/testing, training environments, disaster recovery sites or other applications may have lower availability requirements or they can be ephemeral in nature,…

IBM accelerates enterprise AI for clients with new capabilities on IBM Z

5 min read - Today, we are excited to unveil a new suite of AI offerings for IBM Z that are designed to help clients improve business outcomes by speeding the implementation of enterprise AI on IBM Z across a wide variety of use cases and industries. We are bringing artificial intelligence (AI) to emerging use cases that our clients (like Swiss insurance provider La Mobilière) have begun exploring, such as enhancing the accuracy of insurance policy recommendations, increasing the accuracy and timeliness of…

IBM NS1 Connect: How IBM is delivering network connectivity with premium DNS offerings

4 min read - For most enterprises, how their users access applications and data is an essential part of doing business, and how they service those application and data responses has a direct correlation to revenue generation.    According to We Are Social’s Digital 2023 Global Overview Report, there are 5.19 billion people around the world using the internet in 2023. There’s an imperative need for businesses to trust their networks to deliver meaningful content to address customer needs.  So how responsive is the…