Run Spark Streaming applications

Spark Streaming enables scalable, high-throughput, fault-tolerant stream processing of live streams of data, for example from log files or status update messages. HDFS directories, TCP sockets, and Kafka are some of the supported Spark Streaming data sources. The machine learning and graph processing methods in Spark can even be used on data streams and the processed data can be stored in databases and files.

Spark Streaming takes live input data streams and breaks the streams up into batches, which are processed by the Spark engine to provide a final batch of results. For details, see the Apache Spark Streaming Programming Guide.

Integrating with Kafka

IBM Analytics Engine powered by Apache Spark supports Kafka as a data source for real-time data streaming. The data is processed as it is streamed and can be stored in HDFS, databases or dashboards.

This section shows you how you can leverage Spark Streaming on IBM Analytics Engine powered by Apache Spark with Kafka in a sample application called kafka-stream-example.py.

Sample Python Spark Streaming application:

#!/usr/bin/env python
# coding: utf-8

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import*
from pyspark.sql.types import*
import time

#create spark session
spark = SparkSession.builder.getOrCreate()

# Connect to kafka server and read data stream
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "CHANGEME_KAFKA_SERVER") \
.option("kafka.sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='CHANGEME_USERNAME' password='CHANGEME_PASSWORD';") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.ssl.protocol", "TLSv1.2") \
.option("kafka.ssl.enabled.protocols", "TLSv1.2") \
.option("kafka.ssl.endpoint.identification.algorithm", "HTTPS") \
.option("subscribe", "CHANGEME_TOPIC") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

df.printSchema()

# Write the input data to memory
query = df.writeStream.outputMode("append").format("memory").queryName("testk2s").option("partition.assignment.strategy", "range").start()

query.awaitTermination(30)

query.stop()

query.status

# Query data
test_result=spark.sql("select * from testk2s")
test_result.show(5)

spark.sql("select count(*) from testk2s").show()
test_result_small = spark.sql("select * from testk2s limit 5")
test_result_small.show()

Running your Spark applications

To run the Spark application kafka-stream-example.py using data that is streamed through Kafka, you need to preload the required Kafka and Spark streaming libraries to Spark.

IBM Analytics Engine powered by Apache Spark offers multiple options to persist any libraries that you might need in your applications, including your application file. For details, see Customizing Spark applications in a service volume instance.

In the following example, you will upload the required libraries and the Spark application file to a volume service instance.

To run the Spark application kafka-stream-example.py:

  1. Download the following Python packages from Maven:

  2. Upload the packages and the Spark application file to a volume service instance:

    • By using the API. For instructions, see Customizing Spark applications in a service volume instance.

    • Through the user interface:

      1. From the navigation menu Cloud Pak for Data navigation menu in Cloud Pak for Data, click Services > Instances, find the service volume instance and click it to view the instance details.
      2. Click the File browser tab, and upload your Spark application and the Spark Streaming JAR files you downloaded.
  3. Prepare the Spark application payload.

    You need to define the volumes section in the payload and add the volume service instance and mount details to load the required Python packages before the Spark application starts.

    In the following sample payload.json, the Spark application kafka-stream-example.py and the Kafka libraries are stored in the data-vol volume that is mounted to /myapp. The Python application and the comma-separated list of JARs included in the jars option are automatically transferred to the cluster.

    {
        "application_details": { 
            "application": "/myapp/kafka-stream-example.py", 
            "arguments": [""],
            "conf": { 
                "spark.app.name": "SparkStreams",
                "spark.eventLog.enabled": "true" 
            },
            "jars": "/myapp/spark-sql-kafka-0-10_2.12-3.3.0.jar,/myapp/spark-streaming-kafka-0-10-assembly_2.12-3.3.0.jar,/myapp/commons-pool2-2.11.1.jar" 
        }, 
        "volumes": \[{ 
            "name": "data-vol", 
            "mount_path": "/myapp", 
            "source_sub_path": "" 
        }\]
    }
    
  4. Submit the PySpark application. For details, see Submitting Spark jobs via API.

Learn more

Parent topic: Getting started with Spark applications