Event Streams is directly integrated with the SQL Query service — this tutorial shows you how to run a fully managed stream data ingestion from Kafka into Parquet on Cloud Object Storage.
Machine-generated data, emerging in a real-time streaming fashion, is the kind of big data with prevalent growth today. This includes telemetry data of the physical world — also called IoT data. But it also includes telemetry data of your business, your customers and your IT (e.g., your clickstreams of user interaction or your events, metrics and application logs). This type of data becomes increasingly mission-critical for competitive decision-making, acceleration and automation. As an analytic point of attack to this data, you need a platform that supports real-time data streaming and processing for analytics.
IBM Cloud provides you both: a real-time Kafka messaging service called IBM Event Streams and a cloud data lake service called IBM SQL Query, which leverages IBM Cloud Object Storage for data lake persistency. Since June 2021, Event Streams is directly integrated with the SQL Query service, allowing you to run a fully managed stream data ingestion from Kafka into Parquet on Cloud Object Storage.
This is a getting-started tutorial for using this function. It doesn't assume that you already have a Kafka message stream or a cloud data lake in place. So, you can use it to effectively start from scratch (e.g., in PoC and demo situations).
We will walk you through the following in this tutorial:
- The setup of Event Streams.
- A method to produce and experiment with message data in Event Streams.
- The setup of the cloud data lake.
- The setup of the stream landing from Event Streams into the cloud data lake.
- How to query the landed data.
- At the very end, we also show a method to produce larger volumes of messages so that you can also conduct experiments at scale.
1. Set up your Kafka Service: Event Streams
Look for Event Streams in the IBM Cloud services catalog and provision an instance in your account:
Next, navigate to the dashboard for this new service instance and select Topics from the left navigation. Click Create topic to create a new topic with default options:
Select Service credentials from the left navigation pane and create a new credential by clicking New credential:
Set up via CLI
- Log in with the CLI:
- Create a new Event Streams service instance:
- Create service credentials for the new Event Streams instance:
- Create a topic in the new Event Streams instance:
2. Send hand-crafted test messages
In this tutorial, we assume that you don't already have an application that is producing a Kafka message feed in your Event Streams instance. Therefore, we use a versatile command line tool that allows you to conduct all sort of Kafka interactions from your shell in an interactive manner: kafkacat.
First, we must install kafkacat. If you're on a Mac OS system, you can do so with the following:
For further install options and setup on Linux or Windows, refer to the README.
We create a file
~/.config/kafkacat.conf with the parameters of our Event Streams instance. Specify the following content in this file:
You can find the required values for bootstrap servers and password in the Service credentials section of your Event Stream instance dashboard:
Make sure to specify the
bootstrap.servers as a comma-separated list without brackets and newline characters.
Alternatively to the cloud dashboard, you find the Event Streams service credentials with the CLI tool.
- Identify the name of the service credentials:
- Display the service credentials:
Next, we can try to display the messages in our Event Streams topic with kafkacat in consumer mode (-C):
Since our topic is still fresh and empty, there are 0 records to display.
Send your first messages with kafkacat
Submit some messages ad-hoc by running kafkacat in producer mode (-P) and typing the message content into stdin:
All text that you enter before a newline character is sent as one message. In this case, we have deliberately entered proper JSON content. But a Kafka message can, in fact, be anything.
Display topic content with kafkacat
Now, we can again display the messages in our Event Streams topic:
As you see, the messages that we just submitted are in the topic now.
Display the offset for each message
Each message in a Kafka topic has an offset index. It is unique per partition of the topic. Per default, you only have one partition with partition ID 0. The following kafkacat command uses a format string to display a partition and offset number in front of each message:
With the ibmcloud CLI tool, you can manually delete the oldest messages in a topic by specifying an offset up to which messages should be deleted:
Send messages from a payload file with kafkacat
Instead of providing messages each time to stdin of kafkacat, you may want to send messages that you have prepared and stored in a payload file. You can do so with the -l option in kafkacat. Here is a sample payload file that you can take for your experiments. It contains a set of typical IoT messages in JSON format:
Let's also remove our previous manually submitted messages because they have a different JSON message format than the messages in the payload file we just submitted:
Check how many messages are in your topic now:
3. Set up your data lake services: IBM SQL Query and IBM Cloud Object Storage
The most affordable, versatile and scalable method to persist your Kafka message feed over longer periods of time is a cloud data lake on IBM Cloud Object Storage. So let's set up such a cloud data lake.
Search for Object Storage in the IBM Cloud services catalog and provision an instance in your account:
Once the instance is created, navigate to Buckets in the left navigation pane of the Object Storage service instance dashboard and click Create bucket. Provide a unique name for your new bucket. You can use the default options for the bucket, but make sure that you select the desired region where it should be created:
Next, we can provision our cloud data lake service, which basically means to provision an instance of SQL Query service:
When the service is provisioned, click Launch SQL Query UI to open the SQL interface, which will perform a few useful automatic setup steps and then the cloud data lake is initialized:
Finally, search for Key Protect in the IBM Cloud services catalog and provision an instance in your account:
4. Configure stream landing
You are now ready to connect your Kafka topic to the cloud data lake via a stream landing setup. Navigate to the Event Streams dashboard and select Topics from the navigation pane on the left. Select the context menu for your topic and click Create stream landing configuration:
You are guided through a convenient wizard where you can select your Object Storage instance:
Next, select your Object Storage bucket:
In addition, select your SQL Query instance:
On the final screen, specify the service ID that should be created. It is the cloud user identity used by the stream landing job to connect to the Event Streams, SQL Query and Object Storage instances. The wizard ensures that all necessary access privileges are granted to the service ID. Note that you can also select an existing service ID, which makes sense when you set up additional stream landing jobs and want to share the same service ID among all of the jobs, instead of creating a new one every time. The wizard also creates an API key for the service ID and saves it to your Key Protect service instance for secure retrieval by the stream landing job. That's why you also need to select your Key Protect instance in the wizard. When you are done, click Start streaming data:
You now see the status
Queued for your topic. It can take one or two minutes until the streaming job is fully dispatched and up and running. You will see the status switch to
Running at that point. In the context menu, you find a new option called
View stream landing configuration:
When you click this option, you find the links to the SQL Query instance that is performing the stream landing and the Object Storage bucket that receives the data:
Click on the SQL Query instance link to open the SQL Query web console. You will see the new stream landing job. You see the actual SQL statement that was submitted to SQL Query for the stream landing. It is a SELECT statement from your Event Streams instance and topic (identified via the unique CRN) and the selected data is emitted (EMIT) to your Object Storage bucket AS PARQUET format. The operation is executed (EXECUTE) with the service ID's API key that is stored in the Key Protect instance:
Click on the link in the
Result location field, which opens the Object Storage web console with a filter set to the objects that are being written by that job. You see that there are a couple of metadata objects to track, such as the latest offset that has been consumed and landed. But, in addition, you can find the Parquet files with the actual payload data:
5. Query landed data
In the SQL Query UI, you can now also immediately analyze the landed data. For that you can click the
Query the result link:
This action generates a simple SELECT * sample query on the landed data. For our IoT sample data, add a table transformer
FLATTEN()around the input location, as shown below, because it contains nested columns and the default output that is used for the sample query is CSV, which only supports flat columns:
In the previous SQL, we simply queried the Object Storage location of the landed Parquet data. In a proper data lake, you should manage table definitions. You can easily set up a table for your landed streaming data with the following DDL. As you can see, the table schema is automatically inferred from the data on disk by default:
Now, at last, let's do some aggregation on the landed IoD data:
6. Producing larger volume of test data messages
You can use kafkacat to flexibly produce custom test messages as we have described in detail in Section 2 above. However, when you want to simulate larger message throughput volumes, the kafkacat method is not ideal. But there is another tool that does this job well: event-streams-load-generator.
Set up event-streams-load-generator
Next, submit a desired number of messages that are sampled from a provided message payload file. For your experiments, use the same sample payload file that we also used earlier with kafkacat. Let's submit 100,000 messages based on the sample messages in that file:
Let's verify how many messages are in our topic: