交互式运行Spark应用程序
您可以通过利用内核API以交互方式运行Spark应用程序。
内核即服务提供:
- Jupyter内核作为第一类实体
- 每个内核一个专用集群
- 集群和内核通过用户库进行定制
使用内核API
您可以通过内核API交互式地运行Spark应用程序。 每个应用程序都在专用集群中的内核中运行。 通过API传递的任何配置设置都会覆盖默认配置。
要交互式运行 Spark 应用程序:
若尚未持有访问令牌,请生成一个。 请参阅生成访问令牌。
将令牌导出到变量中:
export TOKEN=<token generated>供应 Analytics Engine powered by Apache Spark 实例。 您需要在项目或部署空间中拥有管理员角色才能创建实例。 请参阅供应实例。
获取实例的Spark内核端点。
- 在导航菜单
中 Cloud Pak for Data ,点击 “服务”>“实例 ”,找到目标实例并单击以查看实例详细信息。
- 在访问信息下,复制并保存Spark内核端点。
- 在导航菜单
使用生成的内核端点和访问令牌创建内核。此示例包含必要的最小强制参数:
curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}" -d '{"name":"scala" }'
创建内核JSON及其支持的参数
若使用内核服务启动创建内核的REST API,可在有效负载中添加高级配置。
请考虑以下示例有效负载和参数列表,这些参数可通过创建内核 API 传递:
curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}" -d '{
"name": "scala",
"kernel_size": {
"cpu": 1,
"memory": "1g"
},
"engine": {
"type": "spark",
"conf": {
"spark.ui.reverseProxy": "false",
"spark.eventLog.enabled": "false"
},
"size": {
"num_workers": "2",
"worker_size": {
"cpu": 1,
"memory": "1g"
}
}
}
}'
回复:
{
"id": "<kernel_id>",
"name": "scala",
"connections": 0,
"last_activity": "2021-07-16T11:06:26.266275Z",
"execution_state": "starting"
}
通过自定义Spark运行时调用Spark内核API
更改Spark运行时版本的输入有效负载示例:
{
"name": "python310",
"kernel_size": {
"cpu": 1,
"memory": "1g"
},
"engine": {
"type": "spark",
"runtime": {
"spark_version": "3.4"
},
"size": {
"num_workers": "2",
"worker_size": {
"cpu": 1,
"memory": "1g"
}
}
}
}
Spark内核API参数
以下是您可以在内核API中使用的参数:
| 名称 | 必选/可选 | 类型 | 描述 |
|---|---|---|---|
| Name | 必需 | 字符串 | 指定内核名称。 支持的值为 scala, r, r42, python39, python310 |
| 引擎 | 可选 | 键值对 | 指定包含配置和版本信息的Spark运行时环境 |
| engine.runtime.spark_version | 可选 | 字符串 | 指定用于内核的Spark运行时版本。 IBM Cloud Pak for Data 支持Spark 3.4。 |
| Type | 若指定引擎则需提供 | 字符串 | 指定内核运行时类型。 目前仅 spark 支持。 |
| conf | 可选 | 键值 JSON 对象 | 指定覆盖预定义值的Spark配置参数 |
| env | 可选 | 键值 JSON 对象 | 指定作业所需的Spark环境变量 |
| Size | 可选 | 接受参数 num_workers、 worker_size 和 master_size |
|
| num_workers | 若指定大小,则为必填项 | 整数 | 指定 Spark 集群中的工作程序节点数量。 num_workers 是您所需的执行程序数。 缺省值为每个工作程序节点对应 1 个执行程序。 支持的最大执行程序数为 50。 |
| worker_size | 可选 | 使用参数 cpu 和 memory。 |
|
| CpU | 必需(如果已指定 worker_size)。 |
整数 | 指定工作程序节点的 CPU 数量。 缺省值为 1 个 CPU。 最多支持10个CPU |
| memory | 必需(如果已指定 worker_size)。 |
整数 | 指定每个工作程序节点的内存量。 缺省值为 1 GB。 最大值为40 GB |
| 主尺寸 | 可选 | 接收参数 cpu 和 memory |
|
| CpU | 如果 master_size 指定了,则需要 |
整数 | 指定主节点的CPU资源量。 缺省值为 1 个 CPU。 最多支持10个CPU |
| memory | 如果 master_size 指定了,则需要 |
整数 | 指定每个主节点的内存大小。 缺省值为 1 GB。 最大值为40 GB |
| 内核大小 | 可选 | 接收参数 cpu 和 memory |
|
| CpU | 如果 kernel_size 指定了,则需要 |
整数 | 指定内核节点的CPU分配量。 缺省值为 1 个 CPU。 最多支持10个CPU |
| memory | 如果 kernel_size 指定了,则需要 |
整数 | 指定每个内核节点的内存量。 缺省值为 1 GB。 最大值为40 GB |
查看内核状态
创建Spark内核后,您可以查看内核详情。
要查看内核状态,请输入:
curl -k -X GET <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer ${TOKEN}"
响应示例:
{
"id": "<kernel_id>",
"name": "scala",
"connections": 0,
"last_activity": "2021-07-16T11:06:26.266275Z",
"execution_state": "starting"
}
删除内核
您可以通过输入以下命令删除Spark内核:
curl -k -X DELETE <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer ${TOKEN}"
列出内核
输入以下命令即可列出所有活动的Spark内核:
curl -k -X GET <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}"
使用 Spark 内核
您可以使用由提供的内核 Analytics Engine Powered by Apache Spark API。 通过WebSocket连接创建内核、检查内核状态以及删除内核。
以下 Python 示例代码使用 Tornado 库向 Jupyter 内核网关服务发起 HTTP 和 WebSocket 调用。 要运行此示例代码,您需要一个已安装 Tornado Python 包的运行时环境。
要创建一个Spark应用程序:
安装 pip。 若未安装,请使用以下命令安装 Tornado Python 包:
yum install python3-pip -y ; pip3 install tornado在工作目录中创建一个名为 的文件
client.py,其中包含以下代码:以下示例代码创建一个Spark Scala 内核,并将 Scala 代码提交至该内核执行:
from uuid import uuid4 import requests import json from tornado import gen from tornado.escape import json_encode, json_decode, url_escape from tornado.httpclient import AsyncHTTPClient, HTTPRequest from tornado.ioloop import IOLoop from tornado.websocket import websocket_connect @gen.coroutine def main(): url = "https://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/icp4d-api/v1/authorize" payload = json.dumps({ "username": "magonzalez", "password": "magonzalez" }) headers = { 'cache-control': 'no-cache', 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) response_json = json.loads(response.text) token = response_json['token'] kg_http_url = "https://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/v4/analytics_engines/c74aa35e-9fcb-4f9a-8e41-59fbdfe1054a/jkg/api/kernels" kg_ws_url = "wss://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/v4/analytics_engines/c74aa35e-9fcb-4f9a-8e41-59fbdfe1054a/jkg/api/kernels" headers = {"Authorization": 'Bearer {}'.format(token) , "Content-Type": "application/json"} validate_cert = False kernel_name="scala" kernel_payload={"name":"scala","kernel_size":{"cpu":3}} print(kernel_payload) code = """ print(s"Spark Version: ${sc.version}") print(s"Application Name: ${sc.appName}") print(s"Application ID: ${sc.applicationId}") import org.apache.spark.sql.SQLContext; val sqlContext = new SQLContext(sc); val data_df_0 = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").option("mode", "DROPMALFORMED").csv("/opt/ibm/spark/examples/src/main/resources/people.csv"); data_df_0.show(5) """ print("Using kernel gateway URL: {}".format(kg_http_url)) print("Using kernel websocket URL: {}".format(kg_ws_url)) # Remove "/" if exists in JKG url's if kg_http_url.endswith("/"): kg_http_url=kg_http_url.rstrip('/') if kg_ws_url.endswith("/"): kg_ws_url=kg_ws_url.rstrip('/') client = AsyncHTTPClient() # Create kernel # POST /api/kernels print("Creating kernel {}...".format(kernel_name)) response = yield client.fetch( kg_http_url, method='POST', headers = headers, validate_cert=validate_cert, body=json_encode(kernel_payload), connect_timeout=240, request_timeout=240 ) kernel = json_decode(response.body) kernel_id = kernel['id'] print("Created kernel {0}.".format(kernel_id)) # Connect to kernel websocket # GET /api/kernels/<kernel-id>/channels # Upgrade: websocket # Connection: Upgrade print("Connecting to kernel websocket...") ws_req = HTTPRequest(url='{}/{}/channels'.format( kg_ws_url, url_escape(kernel_id) ), headers = headers, validate_cert=validate_cert, connect_timeout=240, request_timeout=240 ) ws = yield websocket_connect(ws_req) print("Connected to kernel websocket.") # Submit code to websocket on the 'shell' channel print("Submitting code: \n{}\n".format(code)) msg_id = uuid4().hex req = json_encode({ 'header': { 'username': '', 'version': '5.0', 'session': '', 'msg_id': msg_id, 'msg_type': 'execute_request' }, 'parent_header': {}, 'channel': 'shell', 'content': { 'code': code, 'silent': False, 'store_history': False, 'user_expressions': {}, 'allow_stdin': False }, 'metadata': {}, 'buffers': {} }) # Send an execute request ws.write_message(req) print("Code submitted. Waiting for response...") # Read websocket output until kernel status for this request becomes 'idle' kernel_idle = False while not kernel_idle: msg = yield ws.read_message() msg = json_decode(msg) msg_type = msg['msg_type'] print ("Received message type: {}".format(msg_type)) if msg_type == 'error': print('ERROR') print(msg) break # evaluate messages that correspond to our request if 'msg_id' in msg['parent_header'] and \ msg['parent_header']['msg_id'] == msg_id: if msg_type == 'stream': print(" Content: {}".format(msg['content']['text'])) elif msg_type == 'status' and \ msg['content']['execution_state'] == 'idle': kernel_idle = True # close websocket ws.close() # Delete kernel # DELETE /api/kernels/<kernel-id> print("Deleting kernel...") yield client.fetch( '{}/{}'.format(kg_http_url, kernel_id), method='DELETE', headers = headers, validate_cert=validate_cert, ) print("Deleted kernel {0}.".format(kernel_id)) if __name__ == '__main__': IOLoop.current().run_sync(main)其中:
运行脚本文件:
python client.py
以下代码片段展示了如何在 client.py 文件中修改内核名称、内核有效载荷和代码变量,适用于 Python 和 R 内核:
Python 3.10:
kernel_name="python310" kernel_payload={"name":"python310"} print(kernel_payload) code = '\n'.join(( "print(\"Spark Version: {}\".format(sc.version))", "print(\"Application Name: {}\".format(sc._jsc.sc().appName()))", "print(\"Application ID: {} \".format(sc._jsc.sc().applicationId()))", "sc.parallelize([1,2,3,4,5]).count()" ))R 4.2:
kernel_name="r42" kernel_payload={"name":"r42"} code = """ cat("Spark Version: ", sparkR.version()) conf = sparkR.callJMethod(spark, "conf") cat("Application Name: ", sparkR.callJMethod(conf, "get", "spark.app.name")) cat("Application ID:", sparkR.callJMethod(conf, "get", "spark.app.id")) df <- as.DataFrame(list(1,2,3,4,5)) cat(count(df)) """