Data Processing with Spark Streaming
Applies to :
Spark engine
Apache Gluten accelerated Spark engine
Spark Streaming takes live input data streams and breaks the streams up into batches, which are processed by the Spark engine to provide a final batch of results. For details, see the Apache Spark Streaming Programming Guide.
You can configure Spark log rotation and cleanup to prevent uncontrolled log growth and ensure system stability during long-running streaming workloads. This configuration includes executor log rotation, event log rolling, and automatic cleanup through the Spark History Server. For information, see Enable Spark log and event log rotation.
- Required permissions
- To submit Spark runtime, you must have the User role.
Integrating with Kafka
watsonx.data supports Kafka as a data source for real-time data streaming. The data is processed as it is streamed and can be stored in HDFS, databases or dashboards.
This section shows you how you can leverage Spark Streaming on watsonx.data with Kafka in a sample application
called kafka-stream-example.py.
Sample Python Spark Streaming application:
#!/usr/bin/env python# coding: utf-8import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import*
from pyspark.sql.types import*
import time
#create spark session
spark = SparkSession.builder.getOrCreate()
# Connect to kafka server and read data stream
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "CHANGEME_KAFKA_SERVER") \
.option("kafka.sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='CHANGEME_USERNAME' password='CHANGEME_PASSWORD';") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.ssl.protocol", "TLSv1.2") \
.option("kafka.ssl.enabled.protocols", "TLSv1.2") \
.option("kafka.ssl.endpoint.identification.algorithm", "HTTPS") \
.option("subscribe", "CHANGEME_TOPIC") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()
# Write the input data to memory
query = df.writeStream.outputMode("append").format("memory").queryName("testk2s").option("partition.assignment.strategy", "range").start()
query.awaitTermination(30)
query.stop()
query.status
# Query data
test_result=spark.sql("select * from testk2s")
test_result.show(5)
spark.sql("select count(*) from testk2s").show()
test_result_small = spark.sql("select * from testk2s limit 5")
test_result_small.show()
Running your Spark applications
To run the Spark application kafka-stream-example.py using data that is streamed
through Kafka, you need to preload the required Kafka and Spark streaming libraries to Spark.
watsonx.data offers multiple options to persist any libraries that you might need in your applications, including your application file. For details, see Customizing Spark applications in a service volume instance.
In the following example, you will upload the required libraries and the Spark application file to a volume service instance.
To run the Spark application kafka-stream-example.py:
-
Download the following Python packages from Maven:
spark-sql-kafka; Spark version 3.3; download from spark-sql-kafka-0-10_2.12-3.3.0.jarspark-streaming-kafka; Spark version 3.3; download from spark-streaming-kafka-0-10-assembly_2.12-3.3.0.jarcommons-pool2; Spark version 3.3; download from commons-pool2-2.11.1.jar
-
Upload the packages and the Spark application file to a volume service instance:
-
By using the API. For instructions, see Customizing Spark applications in a service volume instance.
-
Through the user interface:
- From the navigation menu, click Services > Instances, find the service volume instance and click it to view the instance details.
- Click the File browser tab, and upload your Spark application and the Spark Streaming JAR files you downloaded.
-
-
Prepare the Spark application payload.
You need to define the
volumessection in the payload and add the volume service instance and mount details to load the required Python packages before the Spark application starts.In the following sample
payload.json, the Spark applicationkafka-stream-example.pyand the Kafka libraries are stored in thedata-volvolume that is mounted to/myapp. The Python application and the comma-separated list of JARs included in thejarsoption are automatically transferred to the cluster.{"application_details":{"application":"/myapp/kafka-stream-example.py","arguments":[""],"conf":{"spark.app.name":"SparkStreams","spark.eventLog.enabled":"true"},"jars":"/myapp/spark-sql-kafka-0-10_2.12-3.3.0.jar,/myapp/spark-streaming-kafka-0-10-assembly_2.12-3.3.0.jar,/myapp/commons-pool2-2.11.1.jar"},"volumes": \[{"name":"data-vol","mount_path":"/myapp","source_sub_path":""}\]} -
Submit the PySpark application. For details, see Submitting Spark runtimes by accessing storage.