Run Spark Streaming applications
Spark Streaming enables scalable, high-throughput, fault-tolerant stream processing of live streams of data, for example from log files or status update messages. HDFS directories, TCP sockets, and Kafka are some of the supported Spark Streaming data sources. The machine learning and graph processing methods in Spark can even be used on data streams and the processed data can be stored in databases and files.
Spark Streaming takes live input data streams and breaks the streams up into batches, which are processed by the Spark engine to provide a final batch of results. For details, see the Apache Spark Streaming Programming Guide.
Integrating with Kafka
IBM Analytics Engine powered by Apache Spark supports Kafka as a data source for real-time data streaming. The data is processed as it is streamed and can be stored in HDFS, databases or dashboards.
This section shows you how you can leverage Spark Streaming on IBM Analytics Engine powered by Apache Spark with Kafka in a sample application called kafka-stream-example.py.
Sample Python Spark Streaming application:
#!/usr/bin/env python
# coding: utf-8
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import*
from pyspark.sql.types import*
import time
#create spark session
spark = SparkSession.builder.getOrCreate()
# Connect to kafka server and read data stream
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "CHANGEME_KAFKA_SERVER") \
.option("kafka.sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='CHANGEME_USERNAME' password='CHANGEME_PASSWORD';") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.ssl.protocol", "TLSv1.2") \
.option("kafka.ssl.enabled.protocols", "TLSv1.2") \
.option("kafka.ssl.endpoint.identification.algorithm", "HTTPS") \
.option("subscribe", "CHANGEME_TOPIC") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()
# Write the input data to memory
query = df.writeStream.outputMode("append").format("memory").queryName("testk2s").option("partition.assignment.strategy", "range").start()
query.awaitTermination(30)
query.stop()
query.status
# Query data
test_result=spark.sql("select * from testk2s")
test_result.show(5)
spark.sql("select count(*) from testk2s").show()
test_result_small = spark.sql("select * from testk2s limit 5")
test_result_small.show()
Running your Spark applications
To run the Spark application kafka-stream-example.py using data that is streamed through Kafka, you need to preload the required Kafka and Spark streaming libraries to Spark.
IBM Analytics Engine powered by Apache Spark offers multiple options to persist any libraries that you might need in your applications, including your application file. For details, see Customizing Spark applications in a service volume instance.
In the following example, you will upload the required libraries and the Spark application file to a volume service instance.
To run the Spark application kafka-stream-example.py:
-
Download the following Python packages from Maven:
spark-sql-kafka; Spark version 3.3; download from spark-sql-kafka-0-10_2.12-3.3.0.jarspark-streaming-kafka; Spark version 3.3; download from spark-streaming-kafka-0-10-assembly_2.12-3.3.0.jarcommons-pool2; Spark version 3.3; download from commons-pool2-2.11.1.jar
-
Upload the packages and the Spark application file to a volume service instance:
-
By using the API. For instructions, see Customizing Spark applications in a service volume instance.
-
Through the user interface:
- From the navigation menu
in Cloud Pak for Data, click Services > Instances, find the service volume instance and click it to view the instance details.
- Click the File browser tab, and upload your Spark application and the Spark Streaming JAR files you downloaded.
- From the navigation menu
-
-
Prepare the Spark application payload.
You need to define the
volumessection in the payload and add the volume service instance and mount details to load the required Python packages before the Spark application starts.In the following sample
payload.json, the Spark applicationkafka-stream-example.pyand the Kafka libraries are stored in thedata-volvolume that is mounted to/myapp. The Python application and the comma-separated list of JARs included in thejarsoption are automatically transferred to the cluster.{ "application_details": { "application": "/myapp/kafka-stream-example.py", "arguments": [""], "conf": { "spark.app.name": "SparkStreams", "spark.eventLog.enabled": "true" }, "jars": "/myapp/spark-sql-kafka-0-10_2.12-3.3.0.jar,/myapp/spark-streaming-kafka-0-10-assembly_2.12-3.3.0.jar,/myapp/commons-pool2-2.11.1.jar" }, "volumes": \[{ "name": "data-vol", "mount_path": "/myapp", "source_sub_path": "" }\] } -
Submit the PySpark application. For details, see Submitting Spark jobs via API.
Learn more
- Spark jobs API syntax, parameters and return codes
- Spark application types
- Accessing data from storage
- Persisting Spark applications
- Accessing Spark job driver logs
- Releasing job resources
Parent topic: Getting started with Spark applications