Exécuter des applications Spark Streaming
Spark Streaming permet un traitement évolutif, à haut débit et tolérant aux pannes des flux de données en direct, provenant par exemple de fichiers journaux ou de messages de mise à jour de statut. HDFS Les répertoires, les sockets TCP et Kafka font partie des sources de données prises en charge par Spark Streaming. Les méthodes d'apprentissage automatique et de traitement de graphes dans Spark peuvent même être utilisées sur des flux de données, et les données traitées peuvent être stockées dans des bases de données et des fichiers.
Spark Streaming prend les flux de données en direct et les divise en lots, qui sont traités par le moteur Spark afin de fournir un lot final de résultats. Pour plus de détails, consultez le Guide de programmation Apache Spark en continu.
Intégration avec Kafka
IBM Analytics Engine powered by Apache Spark prend en charge Kafka comme source de données pour la diffusion de données en temps réel. Les données sont traitées au fur et à mesure de leur transmission et peuvent être stockées dans des bases de données ou HDFS des tableaux de bord.
Cette section vous montre comment tirer parti de Spark Streaming sur IBM Analytics Engine powered by Apache Spark avec Kafka dans un exemple d'application appelé kafka-stream-example.py.
Exemple d'application Python Spark Streaming :
#!/usr/bin/env python
# coding: utf-8
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import*
from pyspark.sql.types import*
import time
#create spark session
spark = SparkSession.builder.getOrCreate()
# Connect to kafka server and read data stream
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "CHANGEME_KAFKA_SERVER") \
.option("kafka.sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='CHANGEME_USERNAME' password='CHANGEME_PASSWORD';") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.ssl.protocol", "TLSv1.2") \
.option("kafka.ssl.enabled.protocols", "TLSv1.2") \
.option("kafka.ssl.endpoint.identification.algorithm", "HTTPS") \
.option("subscribe", "CHANGEME_TOPIC") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()
# Write the input data to memory
query = df.writeStream.outputMode("append").format("memory").queryName("testk2s").option("partition.assignment.strategy", "range").start()
query.awaitTermination(30)
query.stop()
query.status
# Query data
test_result=spark.sql("select * from testk2s")
test_result.show(5)
spark.sql("select count(*) from testk2s").show()
test_result_small = spark.sql("select * from testk2s limit 5")
test_result_small.show()
Exécution de vos applications Spark
Pour exécuter l'application Spark à kafka-stream-example.py l'aide des données diffusées en continu via Kafka, vous devez précharger les bibliothèques requises Kafka et Spark Streaming dans Spark.
IBM Analytics Engine powered by Apache Spark offre plusieurs options pour conserver toutes les bibliothèques dont vous pourriez avoir besoin dans vos applications, y compris votre fichier d'application. Pour plus d'informations, consultez la section Personnalisation des applications Spark dans une instance de volume de service.
Dans l'exemple suivant, vous allez télécharger les bibliothèques requises et le fichier d'application Spark vers une instance de service de volume.
Pour exécuter l'application Spark kafka-stream-example.py :
Téléchargez les paquets Python suivants depuis Maven :
spark-sql-kafka; Version Spark 3.3; télécharger depuis spark-sql-kafka-0-10_2.12-3.3.0.jarspark-streaming-kafka; Version Spark 3.3; télécharger depuis spark-streaming-kafka-0-10-assembly_2.12-3.3.0.jarcommons-pool2; Version Spark 3.3; télécharger depuis commons-pool2-2.11.1.jar
Téléchargez les paquets et le fichier d'application Spark vers une instance de service de volume :
En utilisant l'API. Pour obtenir des instructions, consultez la section Personnalisation des applications Spark dans une instance de volume de service.
Via l'interface utilisateur :
- Dans le menu
de navigation, cliquez Cloud Pak for Data sur Services > Instances, recherchez l'instance de volume de service et cliquez dessus pour afficher les détails de l'instance.
- Cliquez sur l'onglet Navigateur de fichiers, puis téléchargez votre application Spark et les fichiers JAR Spark Streaming que vous avez téléchargés.
- Dans le menu
Préparez la charge utile de l'application Spark.
Vous devez définir la
volumessection dans la charge utile et ajouter l'instance de service de volume et les détails de montage pour charger les paquets Python requis avant le démarrage de l'application Spark.Dans l'exemple suivant
payload.json, l'applicationkafka-stream-example.pySpark et les Kafka bibliothèques sont stockées dans ledata-volvolume monté sur/myapp. L'application Python et la liste des fichiers JAR séparés par des virgules incluse dansjarsl'option sont automatiquement transférées vers le cluster.{ "application_details": { "application": "/myapp/kafka-stream-example.py", "arguments": [""], "conf": { "spark.app.name": "SparkStreams", "spark.eventLog.enabled": "true" }, "jars": "/myapp/spark-sql-kafka-0-10_2.12-3.3.0.jar,/myapp/spark-streaming-kafka-0-10-assembly_2.12-3.3.0.jar,/myapp/commons-pool2-2.11.1.jar" }, "volumes": \[{ "name": "data-vol", "mount_path": "/myapp", "source_sub_path": "" }\] }Soumettez la PySpark demande. Pour plus d'informations, consultez la section Soumission de tâches Spark via l'API.