Tutorial: Stream Landing from Event Streams Kafka Service to IBM Cloud Data Lake on Object Storage

7 min read

Event Streams is directly integrated with the SQL Query service — this tutorial shows you how to run a fully managed stream data ingestion from Kafka into Parquet on Cloud Object Storage.

Machine-generated data, emerging in a real-time streaming fashion, is the kind of big data with prevalent growth today. This includes telemetry data of the physical world — also called IoT data. But it also includes telemetry data of your business, your customers and your IT (e.g., your clickstreams of user interaction or your events, metrics and application logs). This type of data becomes increasingly mission-critical for competitive decision-making, acceleration and automation. As an analytic point of attack to this data, you need a platform that supports real-time data streaming and processing for analytics.

IBM Cloud provides you both: a real-time Kafka messaging service called IBM Event Streams and a cloud data lake service called IBM SQL Query, which leverages IBM Cloud Object Storage for data lake persistency. Since June 2021, Event Streams is directly integrated with the SQL Query service, allowing you to run a fully managed stream data ingestion from Kafka into Parquet on Cloud Object Storage.

This is a getting-started tutorial for using this function. It doesn't assume that you already have a Kafka message stream or a cloud data lake in place. So, you can use it to effectively start from scratch (e.g., in PoC and demo situations).

We will walk you through the following in this tutorial:

  1. The setup of Event Streams.
  2. A method to produce and experiment with message data in Event Streams.
  3. The setup of the cloud data lake.
  4. The setup of the stream landing from Event Streams into the cloud data lake.
  5. How to query the landed data.
  6. At the very end, we also show a method to produce larger volumes of messages so that you can also conduct experiments at scale.

1. Set up your Kafka Service: Event Streams

Look for Event Streams in the IBM Cloud services catalog and provision an instance in your account:

Look for Event Streams in the IBM Cloud services catalog and provision an instance in your account:

Next, navigate to the dashboard for this new service instance and select Topics from the left navigation. Click Create topic to create a new topic with default options:

Next, navigate to the dashboard for this new service instance and select Topics from the left navigation. Click Create topic to create a new topic with default options:

Select Service credentials from the left navigation pane and create a new credential by clicking New credential:

Select Service credentials from the left navigation pane and create a new credential by clicking New credential:

Set up via CLI

Alternatively, you can also perform these steps on the command line with the ibmcloud CLI tool. Ensure that you have the Event Streams CLI plugin installed and initialized:

  1. Log in with the CLI:
    ibmcloud login --sso
  2. Create a new Event Streams service instance:
    ibmcloud resource service-instance-create "Streaming Ingest" messagehub standard us-south
    Creating service instance Streaming Ingest in resource group default of account Torsten Steinbach's Account as torsten@de.ibm.com...
    OK
    Service instance Streaming Ingest was created.
    
    
    Name:             Streaming Ingest   
    ID:               crn:v1:bluemix:public:messagehub:us-south:a/d86af7367f70fba4f306d3c19c938f2f:23e2f9a2-0c23-4d44-b43e-6a603d3d08ef::   
    GUID:             23e2f9a2-0c23-4d44-b43e-6a603d3d08ef   
    Location:         us-south   
    State:            active   
    Type:             service_instance   
    Sub Type:            
    Allow Cleanup:    false   
    Locked:           false   
    Created at:       2021-06-30T12:02:16Z   
    Updated at:       2021-06-30T12:02:18Z   Last Operation:                                     
    Status    create succeeded                        
    Message   Completed create instance operation
  3. Create service credentials for the new Event Streams instance:
    ibmcloud resource service-key-create "Service credentials-1" Manager --instance-name "Streaming Ingest"
  4. Create a topic in the new Event Streams instance:
    ibmcloud es topic-create json_feed
    Created topic json_feed
    OK

2. Send hand-crafted test messages

In this tutorial, we assume that you don't already have an application that is producing a Kafka message feed in your Event Streams instance. Therefore, we use a versatile command line tool that allows you to conduct all sort of Kafka interactions from your shell in an interactive manner: kafkacat.

Install kafkacat

First, we must install kafkacat. If you're on a Mac OS system, you can do so with the following:

brew update
brew install kafkacat

For further install options and setup on Linux or Windows, refer to the README.

Configure kafkacat

We create a file ~/.config/kafkacat.conf with the parameters of our Event Streams instance. Specify the following content in this file:

bootstrap.servers=<value of property kafka_brokers_sasl: in your EventStreams Credentials>
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.username=token
sasl.password=<value of property password in your EventStreams Credentials>

You can find the required values for bootstrap servers and password in the Service credentials section of your Event Stream instance dashboard:

You can find the required values for bootstrap servers and password in the Service credentials section of your Event Stream instance dashboard:

Make sure to specify the bootstrap.servers as a comma-separated list without brackets and newline characters.

Alternatively to the cloud dashboard, you find the Event Streams service credentials with the CLI tool.

  1. Identify the name of the service credentials:
    ibmcloud resource service-keys --instance-name "Streaming Ingest"
    Retrieving all service keys in all resource groups under account Torsten Steinbach's Account as torsten@de.ibm.com...
    OK
    Name                    State    Created At   
    Service credentials-1   active   Tue Apr 27 09:02:06 UTC 2021
  2. Display the service credentials:
    ibmcloud resource service-key "Service credentials-1"

Test kafkacat

Next, we can try to display the messages in our Event Streams topic with kafkacat in consumer mode (-C):

kafkacat -C -e -t json_feed
% Reached end of topic json_feed [0] at offset 0: exiting

Since our topic is still fresh and empty, there are 0 records to display. 

Send your first messages with kafkacat

Submit some messages ad-hoc by running kafkacat in producer mode (-P) and typing the message content into stdin:

kafkacat -P -t json_feed 
{"submitter": "torsten", "text": "Hello Kafka"}
{"submitter": "rick", "text": "Whatever"}
{"submitter": "morty", "text": "Hello Jessica", "priority": "high"}
^C

All text that you enter before a newline character is sent as one message. In this case, we have deliberately entered proper JSON content. But a Kafka message can, in fact, be anything.

Display topic content with kafkacat

Now, we can again display the messages in our Event Streams topic:

kafkacat -C -e -t json_feed
{"submitter": "torsten", "text": "Hello Kafka"}
{"submitter": "rick", "text": "Whatever"}
{"submitter": "morty", "text": "Hello Jessica", "priority": "high"}
% Reached end of topic json_feed [0] at offset 3: exiting

As you see, the messages that we just submitted are in the topic now.

Display the offset for each message

Each message in a Kafka topic has an offset index. It is unique per partition of the topic. Per default, you only have one partition with partition ID 0. The following kafkacat command uses a format string to display a  partition and offset number in front of each message:

kafkacat -C -e -t json_feed -f '%p:%o\t%s\n'
0:1       {"submitter": "torsten", "text": "Hello Kafka"}
0:2        {"submitter": "rick", "text": "Whatever"}
0:3        {"submitter": "morty", "text": "Hello Jessica", "priority": "high"}
% Reached end of topic json_feed [0] at offset 3: exiting

Delete messages

With the ibmcloud CLI tool, you can manually delete the oldest messages in a topic by specifying an offset up to which messages should be deleted:

ibmcloud es topic-delete-records  json_feed -p 0:2 -f
Deleted records on topic json_feed
OK

kafkacat -C -e -t json_feed -f '%p:%o\t%s\n'
0:2        {"submitter": "rick", "text": "Whatever"}
0:3        {"submitter": "morty", "text": "Hello Jessica", "priority": "high"}
% Reached end of topic json_feed [0] at offset 3: exiting

Send messages from a payload file with kafkacat

Instead of providing messages each time to stdin of kafkacat, you may want to send messages that you have prepared and stored in a payload file. You can do so with the -l option in kafkacat. Here is a sample payload file that you can take for your experiments. It contains a set of typical IoT messages in JSON format:

kafkacat -P -t json_feed -l ./sample-payload-iotmessages.json

Let's also remove our previous manually submitted messages because they have a different JSON message format than the messages in the payload file we just submitted:

ibmcloud es topic-delete-records  json_feed -p 0:4 -f
Deleted records on topic json_feed
OK

Check how many messages are in your topic now:

kafkacat -C -e -t json_feed | wc -l
% Reached end of topic json_feed [0] at offset 1155
    1152

3. Set up your data lake services: IBM SQL Query and IBM Cloud Object Storage

The most affordable, versatile and scalable method to persist your Kafka message feed over longer periods of time is a cloud data lake on IBM Cloud Object Storage. So let's set up such a cloud data lake.

Search for Object Storage in the IBM Cloud services catalog and provision an instance in your account:

Search for Object Storage in the IBM Cloud services catalog and provision an instance in your account:

Once the instance is created, navigate to Buckets in the left navigation pane of the Object Storage service instance dashboard and click Create bucket. Provide a unique name for your new bucket. You can use the default options for the bucket, but make sure that you select the desired region where it should be created:

You can use the default options for the bucket, but make sure that you select the desired region where it should be created:

Next, we can provision our cloud data lake service, which basically means to provision an instance of SQL Query service:

Next, we can provision our cloud data lake service, which basically means to provision an instance of SQL Query service:

When the service is provisioned, click Launch SQL Query UI to open the SQL interface, which will perform a few useful automatic setup steps and then the cloud data lake is initialized:

When the service is provisioned, click Launch SQL Query UI to open the SQL interface, which will perform a few useful automatic setup steps and then the cloud data lake is initialized:

Finally, search for Key Protect in the IBM Cloud services catalog and provision an instance in your account:

Finally, search for Key Protect in the IBM Cloud services catalog and provision an instance in your account:

4. Configure stream landing

You are now ready to connect your Kafka topic to the cloud data lake via a stream landing setup. Navigate to the Event Streams dashboard and select Topics from the navigation pane on the left. Select the context menu for your topic and click Create stream landing configuration:

Select the context menu for your topic and click Create stream landing configuration:

You are guided through a convenient wizard where you can select your Object Storage instance:

You are guided through a convenient wizard where you can select your Object Storage instance:

Next, select your Object Storage bucket:

Next, select your Object Storage bucket:

In addition, select your SQL Query instance:

In addition, select your SQL Query instance:

On the final screen, specify the service ID that should be created. It is the cloud user identity used by the stream landing job to connect to the Event Streams, SQL Query and Object Storage instances. The wizard ensures that all necessary access privileges are granted to the service ID. Note that you can also select an existing service ID, which makes sense when you set up additional stream landing jobs and want to share the same service ID among all of the jobs, instead of creating a new one every time. The wizard also creates an API key for the service ID and saves it to your Key Protect service instance for secure retrieval by the stream landing job. That's why you also need to select your Key Protect instance in the wizard. When you are done, click Start streaming data:

That's why you also need to select your Key Protect instance in the wizard. When you are done, click Start streaming data:

You now see the status Queued for your topic. It can take one or two minutes until the streaming job is fully dispatched and up and running. You will see the status switch to Running at that point. In the context menu, you find a new option called View stream landing configuration:

You will see the status switch to Running at that point. In the context menu, you find a new option called View stream landing configuration:

When you click this option, you find the links to the SQL Query instance that is performing the stream landing and the Object Storage bucket that receives the data:

When you click this option, you find the links to the SQL Query instance that is performing the stream landing and the Object Storage bucket that receives the data:

Click on the SQL Query instance link to open the SQL Query web console. You will see the new stream landing job. You see the actual SQL statement that was submitted to SQL Query for the stream landing. It is a SELECT statement from your Event Streams instance and topic (identified via the unique CRN) and the selected data is emitted (EMIT) to your Object Storage bucket AS PARQUET format. The operation is executed (EXECUTE) with the service ID's API key that is stored in the Key Protect instance:

The operation is executed (EXECUTE) with the service ID's API key that is stored in the Key Protect instance:

Click on the link in the Result location field, which opens the Object Storage web console with a filter set to the objects that are being written by that job. You see that there are a couple of metadata objects to track, such as the latest offset that has been consumed and landed. But, in addition, you can find the Parquet files with the actual payload data:

But, in addition, you can find the Parquet files with the actual payload data:

5. Query landed data

In the SQL Query UI, you can now also immediately analyze the landed data. For that you can click the Query the result link:

In the SQL Query UI, you can now also immediately analyze the landed data. For that you can click the Query the result link:

This action generates a simple SELECT * sample query on the landed data. For our IoT sample data, add a table transformer FLATTEN()around the input location, as shown below, because it contains nested columns and the default output that is used for the sample query is CSV, which only supports flat columns:

For our IoT sample data, add a table transformer FLATTEN()aroundthe input location, as shown below, because it contains nested columns and the default output that is used for the sample query is CSV, which only supports flat columns:

In the previous SQL, we simply queried the Object Storage location of the landed Parquet data. In a proper data lake, you should manage table definitions. You can easily set up a table for your landed streaming data with the following DDL. As you can see, the table schema is automatically inferred from the data on disk by default:

As you can see, the table schema is automatically inferred from the data on disk by default:

Now, at last, let's do some aggregation on the landed IoD data:

Now, at last, let's do some aggregation on the landed IoD data:

6. Producing larger volume of test data messages

You can use kafkacat to flexibly produce custom test messages as we have described in detail in Section 2 above. However, when you want to simulate larger message throughput volumes, the kafkacat method is not ideal. But there is another tool that does this job well: event-streams-load-generator.

Set up event-streams-load-generator

Download the es-producer.jar file and create a file producer.config in the same directory based on this template. Make sure to uncomment and configure the following properties:

bootstrap.servers=<value of property kafka_brokers_sasl: in your EventStreams Credentials>
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=token password=<value of property password in your EventStreams Credentials>;

Next, submit a desired number of messages that are sampled from a provided message payload file. For your experiments, use the same sample payload file that we also used earlier with kafkacat. Let's submit 100,000 messages based on the sample messages in that file:

java -jar es-producer.jar -t json_feed -n 100000  -f ./sample-payload-iotmessages.json
Reading payloads from: ./sample-payload-iotmessages.json
Number of messages read: 1152
:
2021-06-30 17:29:40,056 [producer0] INFO  org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
2021-06-30 17:29:40,281 [producer0] INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.5.0
2021-06-30 17:29:40,281 [producer0] INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 66563e712b0b9f84
2021-06-30 17:29:40,281 [producer0] INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1625066980276
2021-06-30 17:29:42,059 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: L7b8ASdOS3GQDek_Yr9AXw
7726 records sent, 1545.2 records/sec (0.40 MB/sec), 13.0 ms avg latency, 2992.0 ms max latency.
17809 records sent, 3536.3 records/sec (0.92 MB/sec), 8.6 ms avg latency, 166.0 ms max latency.
22575 records sent, 4515.0 records/sec (1.18 MB/sec), 6.9 ms avg latency, 109.0 ms max latency.
23039 records sent, 4599.5 records/sec (1.20 MB/sec), 6.7 ms avg latency, 67.0 ms max latency.
20204 records sent, 4036.0 records/sec (1.05 MB/sec), 7.6 ms avg latency, 274.0 ms max latency.
2021-06-30 17:30:07,450 [producer0] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
100000 records sent, 3673.094582 records/sec (0.96 MB/sec), 7.86 ms avg latency, 2992.00 ms max latency, 2 ms 50th, 34 ms 95th, 95 ms 99th, 162 ms 99.9th.

Let's verify how many messages are in our topic:

kafkacat -C -e -t json_feed | wc -l
% Reached end of topic json_feed [0] at offset 101155: exiting
  101152

Further resources

Be the first to hear about news, product updates, and innovation from IBM Cloud