访问存储器中的数据
使用 Spark 作业 API 时,可以将应用程序作业文件和数据文件存储在可通过 IBM Cloud Pak for Data 卷 API 管理的存储卷中,或者,您可以供应 IBM Cloud Object Storage 实例。
文件可以位于 IBM Cloud Pak for Data 集群上的文件存储系统中,也可以位于 IBM Cloud Object Storage 中。请参阅存储注意事项。
使用外部卷中的文件
在使用 Analytics Engine powered by Apache Spark 运行的 Spark 应用程序中,引用 Spark 作业文件、输入数据或输出数据的常用方法是通过外部存储卷进行引用,这些卷可使用 IBM Cloud Pak for Data 卷 API 来管理。
可以使用下列外部卷:
- 外部 NFS 存储卷
- 请参阅设置外部 NFS 服务器的先决条件中的“设置 NFS 卷的先决条件”。
- 请参阅在外部 NFS 服务器上创建卷,以了解如何在外部 NFS 服务器上创建卷。
- 现有的持久卷声明
- 请参阅在持久卷声明中创建卷。
-
您创建的新卷
通过使用卷 API,您可以创建一个或多个所需大小的卷,使用卷 API 来上载数据和应用程序,然后在 Spark 作业 API 中传递卷标识作为参数。有关详细信息,请参阅使用卷 REST API 管理持久卷实例。
以下 cURL 代码片段显示如何创建文件服务器,上载文件,下载文件,然后停止文件服务器。在片段中,
7a8e1ca7a6854e35b9c898e985075ed7/41bba1ac-d013-435e-b4df-d5ddc26a3259/1.json是该卷中的位置,/nginx_data/1.json是要上载的本地文件。在路径中的目录与文件名之间,请使用%2F。请注意,不应使用实例主存储卷来上载作业或数据。例如,假设您已创建名为volume1的卷来上载数据和作业文件。在开始之前,需要获取该服务实例的访问令牌。请输入以下 cURL 命令,它会返回含访问令牌的 JSON 响应。请插入 IBM Cloud Pak for Data 集群 URL、用户名和密码。
curl -k -X GET https://<CloudPakforData_URL>/v1/preauth/validateAuth -H 'content-type: application/json' -H 'username: <YOUR_USERNAME>' -H 'password: <YOUR_PASSWORD>'利用返回的访问令牌,可以发出下列 cURL 命令:
- 要创建名为
volume1的新卷:curl -ik -X POST https://<CloudPakForData_URL/zen-data/v3/service_instances -H "Authorization: Bearer <ACCESS_TOKEN>" -H 'Content-Type: application/json' -d '{"addon_type":"volumes","addon_version":"-","create_arguments":{"metadata":{"storageClass":"<storage_class>","storageSize":"<space-allocated-to-volume>"}},"namespace":"<project_name>","display_name":"<Volume_name>"}' - 要启动文件服务器:
curl -k -i -X POST 'https://<CloudPakforData_URL>/zen-data/v1/volumes/volume_services/volume1' -H "Authorization: Bearer <ACCESS_TOKEN>" -d '{}' -H 'Content-Type: application/json' -H 'cache-control: no-cache' - 要上载文件:
curl -k -i -X PUT 'https://<CloudPakforData_URL>/zen-volumes/volume1/v1/volumes/files/7a8e1ca7a6854e35b9c898e985075ed7%2F41bba1ac-d013-435e-b4df-d5ddc26a3259%2F1.json' -H "Authorization: Bearer <ACCESS_TOKEN>" -H 'cache-control: no-cache' -H 'content-type: multipart/form-data' -F 'upFile=@/nginx_data/1.json' - 要上载并解压缩 tar 文件:
curl -k -i -X PUT 'https://<CloudPakforData_URL>/zen-volumes/volume1/v1/volumes/files/<YOUR_DIRECTORY>%2F?extract=true' -H "Authorization: Bearer <ACCESS_TOKEN>" -H 'cache-control: no-cache' -H 'content-type: multipart/form-data' -F 'upFile=@</local/path/file.tar.gz>' - 要下载文件:
curl -k -i -X GET 'https://<CloudPakforData_URL>/zen-volumes/volume1/v1/volumes/files/7a8e1ca7a6854e35b9c898e985075ed7%2F41bba1ac-d013-435e-b4df-d5ddc26a3259%2F1.json' -H "Authorization: Bearer <ACCESS_TOKEN>" -H 'cache-control: no-cache' - 要停止文件服务器:
curl -k -i -X DELETE 'https://<CloudPakforData_URL>/zen-data/v1/volumes/volume_services/volume1' -H "Authorization: Bearer <ACCESS_TOKEN>"
- 要创建名为
使用多个存储卷中的文件
在创建 Spark 作业有效内容时,可以使用多个存储卷。
以下示例显示上载至 vol1 卷的 customApps 目录下的 Spark 应用程序,该卷以 /myapp 形式装载在 Spark 集群上。用户数据位于 vol2 卷,该卷以 /data 形式装载在 Spark 集群上。
{
"engine": {
"type": "spark",
"conf": {
"spark.executor.extraClassPath":"/myapp/*",
"spark.driver.extraClassPath":"/myapp/*"
},
"volumes": [{ "volume_name": "vol1", "source_path": "customApps", "mount_path": "/myapp" },{ "volume_name": "vol2", "source_path": "", "mount_path": "/data" }]
},
"application_arguments": ["12"],
"application_jar": "/myapp/spark-examples_2.11-2.4.3.jar",
"main_class": "org.apache.spark.examples.SparkPi"
}
使用 Object Storage 中的文件
可以将作业文件和数据存储在与 S3 兼容的 Object Storage 存储区。下列步骤描述对于 IBM Cloud Object Storage 存储区如何执行此操作。
-
创建应用程序,例如 Python 程序文件 cosExample.py:
from __future__ import print_function import sys import calendar import time from pyspark.sql import SparkSession if __name__ == "__main__": if len(sys.argv) != 5: print("Usage: cosExample <access-key> <secret-key> <endpoint> <bucket>", file=sys.stderr) sys.exit(-1) spark = SparkSession.builder.appName("CosExample").getOrCreate() prefix = "fs.cos.llservice" hconf = spark.sparkContext._jsc.hadoopConfiguration() hconf.set(prefix +".endpoint", sys.argv[3]) hconf.set(prefix + ".access.key", sys.argv[1]) hconf.set(prefix + ".secret.key", sys.argv[2]) data = [1, 2, 3, 4, 5, 6] distData = spark.sparkContext.parallelize(data) distData.count() path = "cos://{}.llservice/{}".format(sys.argv[4], calendar.timegm(time.gmtime())) distData.saveAsTextFile(path) rdd = spark.sparkContext.textFile(path) print ("output rdd count: {}". format(rdd.count())) spark.stop() -
装入作业文件。要从外部卷装入作业文件,请将 cosExample.py 上载至存储卷
vol1中的customApps目录下,该卷以/myapp形式装载在 Spark 集群中:{ "engine": { "type": "spark", "volumes": [{ "volume_name": "vol1", "source_path": "customApps", "mount_path": "/myapp" }] }, "application_arguments": ["<ACCESS_KEY>", "<COS_SECRET_KEY>","<COS_ENDPOINT>", "<BUCKET_NAME>"], "application_jar": "/myapp/cosExample.py", "main_class": "org.apache.spark.deploy.SparkSubmit" } -
或者,要从 IBM Cloud Object Storage 存储区装入作业文件,请从 IBM Cloud Object Storage 服务 (
<COS_SERVICE_NAME>) 中的存储区<BUCKET_NAME>上载<OBJECT_NAME>中的作业文件:{ "engine": { "type": "spark", "template_id": "<template-id>", "conf": { "spark.app.name": "MyJob", "spark.hadoop.fs.cos.<REPLACE_WITH_COS_SERVICE_NAME>.endpoint":"<COS_ENDPOINT>", "spark.hadoop.fs.cos.<REPLACE_WITH_COS_SERVICE_NAME>.secret.key":"<COS_SECRET_KEY>", "spark.hadoop.fs.cos.<REPLACE_WITH_COS_SERVICE_NAME>.access.key":"<COS_ACCESS_KEY>" }, "size": { "num_workers": 1, "worker_size": { "cpu": 1, "memory": "1g" }, "driver_size": { "cpu": 1, "memory": "1g" } } }, "application_arguments": ["cos://<BUCKET_NAME>.<COS_SERVICE_NAME>/<OBJECT_NAME>"], "application_jar": "cos://<BUCKET_NAME>.<COS_SERVICE_NAME>/<REPLACE_WITH_OBJECT_NAME>", "main_class": "org.apache.spark.deploy.SparkSubmit" }