Confluent adds governance, data integration, observability, and operational management to existing Kafka systems to streamline and scale data streaming.
Apache Kafka is a powerful distributed event streaming system that provides real-time data. Event-driven architectures are the most commonly used strategy for streaming data: continuous generation and transmission of data from sources like sensors, websites and financial transactions.
However, enterprises often need more data governance, data security, data integration, observability and operational data management. Confluent packages all of these capabilities into a data platform that reduces the complexity of deploying and operating Kafka at scale.
Running Kafka clusters in production requires managing brokers, storage, networking, scaling, upgrades, monitoring and disaster recovery. Confluent offers managed cloud services and operational tooling that reduce the burden on infrastructure teams and allow developers to focus on building applications rather than maintaining clusters.
Rather than treating Kafka as a messaging system, Confluent promotes Kafka as a central nervous system for enterprise data. Events can be shared across applications, advanced analytics systems, artificial intelligence and machine learning platforms, and operational systems in real time.
This enables data architectures that are more responsive and less dependent on either batch processing or batch data ingestion and that speed data-driven decision-making and business intelligence.
To see how this story plays out, imagine the use cases of a large health insurance provider, “Wellspring Health”, which already runs Apache Kafka on-premises to process healthcare and insurance events.
Their legacy system contains a Kafka environment that handles:
· Medical claims
· Pharmacy transactions
· Fraud detection
· Provider billing events
· Member eligibility updates
The system works but data teams struggle with inconsistent schemas, unknown downstream consumers, custom ETL pipelines and slow data analytics onboarding. To improve operational efficiency, Wellspring looks to adopt the cloud-based Confluent Cloud in a data modernization initiative to upgrade their streaming platform while still preserving their existing Kafka investments and knowledge.
The highest priorities on the roadmap for Wellspring are to:
1. Detect potentially fraudulent claims in real time
2. Provide access controls to govern sensitive data structures
3. Easily provide Iceberg snapshots for ingestion into a data lake or data warehouse
In this tutorial, you’ll see how Wellspring can modernize and optimize their data infrastructure and Kafka architectures with Confluent to easily create derived big data streams, improve data processing and create dashboards. This improvement will give them a more scalable and unified data infrastructure and analytics ecosystem.
The first step along this path is to take the Kafka streams that they’re already using in an on-premises environment and move them to Confluent Cloud.
You can find the code sections of this tutorial in our GitHub repository in the Confluent Modernization tutorial.
Create an account in Confluent Cloud. You can use GitHub, Google or simply use your own email address. Answer the initial questions however you’d like. When you reach the stage to create your own cluster, select “Explore other cluster types and pricing”.
Create the environment. For the purposes of this demo, call it “default”.
Next, create the cluster. For the purposes of this demo, this one is named “tutorial_cluster” but you can choose any name that you’d like. Use the Basic cluster for this tutorial because it has pricing compatible with a simple proof of concept.
Now you’re ready to create your first Kafka topic. A Kafka topic is the fundamental unit of organization in Apache Kafka. You can think of it as a feed name or logical channel where data records are published and stored.
After launching your cluster you can now create the first topic, called
Now, edit the data contract:
The JSON for this data contract is:
Now you’ll add the data topic to the schema registry. This provides a single point of governance through the schema registry so that all clients querying topics know what to expect.
Navigate to the schema registry by navigating to your cluster:
Then, select ‘Data contracts’ and then ‘Add data contract’:
Next, enter the data contract for
Now select the API endpoint and note the URL. This is how you can access the schema registry from any client. To see how it works in action, create a user account to access the schema registry remotely.
Select your username from the upper right menu and then navigate to the API keys view, then select add API key. Enter whatever name that you’d like and select the schema registry for the key scope and default for the environment.
This generates a new key. Download the key, open the downloaded text file and copy the values into your .env file. Save the API key as
You can now test the schema registry like so. You’ll want to create a Python environment and install the libraries found in the
Now you can see how any client accessing the streams will be able to see the data contract for that stream and any changes that might have occurred in it before reading or writing to it.
Now you can create derived topics in Confluent Cloud through an automation that takes the form of a ksqlDB statement. These topics can check data quality, aggregate or process event streams, send notifications and many other data workflows. In Confluent architectures, topics are generated through:
· Stream processing
· Joins
· Aggregations
· Enrichment pipelines
Often using:
· ksqlDB
· Kafka Streams
· Apache Flink
You’ll see how Confluent ksqlDB can help Wellspring create two derived topics to streamline data access. The first is to track high-value claims of more than USD 10,000.
You’ll need a globally scoped key for the ksqlDB operations. Select your username from the upper right menu and then navigate to the API keys view, then select add API key. Enter whatever name that you’d like and select global for the key scope.
It generates a new key that you can use across Confluent Cloud. Download the key and open that text file and copy the values into your .env file. Save the API key from the text file as
You’ll also copy your user ID from your user settings.
Save this to your .env file as
Open the environment details in the environment tab:
Copy the env ID and save it to your .env file as
Open the Cluster details in the Cluster tab:
Save this to your .env file as CLUSTER_ID.
Now you’ll create the ksql database by using the REST URL from your environment:
Now you’ve created a ksql database. This database is a purpose-built event streaming database that lets you process and analyze data in Apache Kafka by using standard, lightweight SQL syntax. To use your new ksqlDB, go to the Confluent Cloud console and open the ksqlDB menu item. This might take a few minutes to provision and show up in the Confluent Cloud UI.
Go to settings and copy the ‘REST endpoint URL’.
Save this to your .env file as KSQL_ENDPOINT.
Now you can create a base stream to work with ksqlDB:
Now with that base stream, you can use SQL statements to filter all the data coming from the base stream into a derived stream for real-time analytics:
You can also create complex windowing. For instance, there are certain kinds of procedures that should trigger an audit and multiple auditable claims in a short period of time indicate that a provider’s systems might have been compromised in a data breach.
To help track this, you’ll create an auditor_stream of claims that are over USD 5000 for procedures ‘7371’, ‘2710’ and ‘1831’. Then, create a new stream that captures whether there are multiple items in the auditor stream in any 10-minute period and capture that as
One important thing to note is that the creation of these derived streams will append extra characters to the beginning of the name of the derived topic. You can see the correct name in the topic for your cluster:
Now that you’ve created the derived topics, you can query that stream. Run the following code in a new Python window so that you can see results come in as they are published to the base
Because you haven’t created data that would trigger a provider spike yet, nothing will show here yet. To create this data, write to the medical_claims stream.
This requires a key in the cluster itself. Navigate to your cluster and then to API Keys and create a new key. This key will be scoped to the cluster itself and so can be used as a producer of events. Download this key and copy the API key, API secret into your .env as
Now, open a second terminal window and run the following Python code. This code block first gets the data contract from the schema registry and uses that to ensure that the message being sent contains allthe correct fields. This data is then sent through the derived streams to demonstrate how data flows through real-time data processing systems.
This method creates 20 high value claims, which will show up in the base stream. Then, they show upalso in the provider spike stream because it generates 10 high-value claims in a short time span for 2 different providers.
You’ll see the terminal window with the derived topics capture the provider spike from the streamed medical claims and print that value to the terminal window.
Another key offering of Confluent Cloud is Tableflow, which allows you to quickly and easily create Iceberg tables that store metadata and snapshots of stored messages. Iceberg is an open source data store that creates snapshots of real-time data streaming systems. These snapshots can be ingested into analytics engines like IBMs watsonx.data® or other providers like Snowflake or Amazon S3.
Go to the topics view in Confluent Cloud and select the generated high value flow. The generated name from the ksql operation will be something like
Open the topic and click ‘Enable Tableflow’ in the upper right. Select ‘Iceberg’ and ‘Use Confluent Storage’.
This stores Iceberg snapshots of all events, along with the associated metadata, creating a widely compatible data asset for consumption by any analytics engine or storage in a data center.
In order to query the Iceberg datasets create, you’ll need to copy the environment ID from the environment > details view and the organization ID from your organizations. Save this data to your .env file as
Then, copy the organization ID from your organization tab:
Save this your .env file as
You can test the Iceberg table creation with the following code:
This will show all data that has been created and stored in the Iceberg table.
Finally, you might want to enable Wellspring Health to view a data visualization dashboard for the high-value claims and provider claim spike datasets. To build it, you’ll create an app by using the Streamlit framework that uses the data pipelines enabled by Confluent Cloud to provide a dashboard view to stakeholders.
First, some configuration fields for the user:
Next, retrieve the schemas from the schema registry. You’ll need to get the generated names of your derived topics from the topics view in Confluent Cloud.
Next, create a streaming function to grab updates from Confluent:
Finally, create the stream when the user enters credentials and display each record as it comes in as well as a sum of the number of claims and claim amounts.
You can run this streamlit app by using the following command in your Python environment:
This shows the power of how data streaming platforms like Confluent Cloud can create a modern platform to centralize and simplify Kafka deployments and streamline data collection, validation and data storage. Ensuring that schemas are correctly applied and automatically generating Iceberg tables doesn’t require instrumentation or external storage. This all enables high volumes of data to be processed in a low-latency, near real-time fashion that enables real-time insights from various data sources.
Stream, connect, process and govern your data, designed by the original co-creators of Apache Kafka®.
Make AI connected, trustworthy and actionable—so analytics and AI agents yield business value.
Successfully scale AI with the right strategy, data, security and governance in place.