Data Processing with Spark Streaming

Spark Streaming enables scalable, high-throughput, fault-tolerant stream processing of live streams of data, for example from log files or status update messages. HDFS directories, TCP sockets, and Kafka are some of the supported Spark Streaming data sources. The machine learning and graph processing methods in Spark can even be used on data streams and the processed data can be stored in databases and files.

Applies to :

Spark engine

Apache Gluten accelerated Spark engine

Spark Streaming takes live input data streams and breaks the streams up into batches, which are processed by the Spark engine to provide a final batch of results. For details, see the Apache Spark Streaming Programming Guide.

You can configure Spark log rotation and cleanup to prevent uncontrolled log growth and ensure system stability during long-running streaming workloads. This configuration includes executor log rotation, event log rolling, and automatic cleanup through the Spark History Server. For information, see Enable Spark log and event log rotation.

Required permissions
To submit Spark runtime, you must have the User role.

Integrating with Kafka

watsonx.data supports Kafka as a data source for real-time data streaming. The data is processed as it is streamed and can be stored in HDFS, databases or dashboards.

This section shows you how you can leverage Spark Streaming on watsonx.data with Kafka in a sample application called kafka-stream-example.py.

Sample Python Spark Streaming application:

#!/usr/bin/env python# coding: utf-8import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import*
from pyspark.sql.types import*
import time

#create spark session
spark = SparkSession.builder.getOrCreate()

# Connect to kafka server and read data stream
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "CHANGEME_KAFKA_SERVER") \
.option("kafka.sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='CHANGEME_USERNAME' password='CHANGEME_PASSWORD';") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.ssl.protocol", "TLSv1.2") \
.option("kafka.ssl.enabled.protocols", "TLSv1.2") \
.option("kafka.ssl.endpoint.identification.algorithm", "HTTPS") \
.option("subscribe", "CHANGEME_TOPIC") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

df.printSchema()

# Write the input data to memory
query = df.writeStream.outputMode("append").format("memory").queryName("testk2s").option("partition.assignment.strategy", "range").start()

query.awaitTermination(30)

query.stop()

query.status

# Query data
test_result=spark.sql("select * from testk2s")
test_result.show(5)

spark.sql("select count(*) from testk2s").show()
test_result_small = spark.sql("select * from testk2s limit 5")
test_result_small.show()

Running your Spark applications

To run the Spark application kafka-stream-example.py using data that is streamed through Kafka, you need to preload the required Kafka and Spark streaming libraries to Spark.

watsonx.data offers multiple options to persist any libraries that you might need in your applications, including your application file. For details, see Customizing Spark applications in a service volume instance.

In the following example, you will upload the required libraries and the Spark application file to a volume service instance.

To run the Spark application kafka-stream-example.py:

  1. Download the following Python packages from Maven:

  2. Upload the packages and the Spark application file to a volume service instance:

    • By using the API. For instructions, see Customizing Spark applications in a service volume instance.

    • Through the user interface:

      1. From the navigation menu, click Services > Instances, find the service volume instance and click it to view the instance details.
      2. Click the File browser tab, and upload your Spark application and the Spark Streaming JAR files you downloaded.
  3. Prepare the Spark application payload.

    You need to define the volumes section in the payload and add the volume service instance and mount details to load the required Python packages before the Spark application starts.

    In the following sample payload.json, the Spark application kafka-stream-example.py and the Kafka libraries are stored in the data-vol volume that is mounted to /myapp. The Python application and the comma-separated list of JARs included in the jars option are automatically transferred to the cluster.

    {"application_details":{"application":"/myapp/kafka-stream-example.py","arguments":[""],"conf":{"spark.app.name":"SparkStreams","spark.eventLog.enabled":"true"},"jars":"/myapp/spark-sql-kafka-0-10_2.12-3.3.0.jar,/myapp/spark-streaming-kafka-0-10-assembly_2.12-3.3.0.jar,/myapp/commons-pool2-2.11.1.jar"},"volumes": \[{"name":"data-vol","mount_path":"/myapp","source_sub_path":""}\]}
  4. Submit the PySpark application. For details, see Submitting Spark runtimes by accessing storage.