交互式运行Spark应用程序

您可以通过利用内核API以交互方式运行Spark应用程序。

内核即服务提供:

  • Jupyter内核作为第一类实体
  • 每个内核一个专用集群
  • 集群和内核通过用户库进行定制

使用内核API

您可以通过内核API交互式地运行Spark应用程序。 每个应用程序都在专用集群中的内核中运行。 通过API传递的任何配置设置都会覆盖默认配置。

要交互式运行 Spark 应用程序:

  1. 若尚未持有访问令牌,请生成一个。 请参阅生成访问令牌

  2. 将令牌导出到变量中:

    export TOKEN=<token generated>
    
  3. 供应 Analytics Engine powered by Apache Spark 实例。 您需要在项目或部署空间中拥有管理员角色才能创建实例。 请参阅供应实例

  4. 获取实例的Spark内核端点。

    1. 在导航菜单 Cloud Pak for Data 导航菜单 中 Cloud Pak for Data ,点击 “服务”>“实例 ”,找到目标实例并单击以查看实例详细信息。
    2. 在访问信息下,复制并保存Spark内核端点。
  5. 使用生成的内核端点和访问令牌创建内核。此示例包含必要的最小强制参数:

    curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}" -d '{"name":"scala" }'
    

创建内核JSON及其支持的参数

若使用内核服务启动创建内核的REST API,可在有效负载中添加高级配置。

请考虑以下示例有效负载和参数列表,这些参数可通过创建内核 API 传递:

curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}" -d '{
  "name": "scala",
  "kernel_size": {
    "cpu": 1,
    "memory": "1g"
  },
  "engine": {
    "type": "spark",
    "conf": {
      "spark.ui.reverseProxy": "false",
      "spark.eventLog.enabled": "false"
    },
    "size": {
      "num_workers": "2",
      "worker_size": {
        "cpu": 1,
        "memory": "1g"
      }
    }
  }
}'

回复:

{
    "id": "<kernel_id>",
    "name": "scala",
    "connections": 0,
    "last_activity": "2021-07-16T11:06:26.266275Z",
    "execution_state": "starting"
}

通过自定义Spark运行时调用Spark内核API

更改Spark运行时版本的输入有效负载示例:

{
  "name": "python310",
  "kernel_size": {
    "cpu": 1,
    "memory": "1g"
  },
  "engine": {
    "type": "spark",
    "runtime": {
      "spark_version": "3.4"
    },
    "size": {
      "num_workers": "2",
      "worker_size": {
        "cpu": 1,
        "memory": "1g"
      }
    }
  }
}

Spark内核API参数

以下是您可以在内核API中使用的参数:

名称 必选/可选 类型 描述
Name 必需 字符串 指定内核名称。 支持的值为 scala, r, r42, python39, python310
引擎 可选 键值对 指定包含配置和版本信息的Spark运行时环境
engine.runtime.spark_version 可选 字符串 指定用于内核的Spark运行时版本。 IBM Cloud Pak for Data 支持Spark 3.4。
Type 若指定引擎则需提供 字符串 指定内核运行时类型。 目前仅 spark 支持。
conf 可选 键值 JSON 对象 指定覆盖预定义值的Spark配置参数
env 可选 键值 JSON 对象 指定作业所需的Spark环境变量
Size 可选 接受参数 num_workersworker_sizemaster_size
num_workers 若指定大小,则为必填项 整数 指定 Spark 集群中的工作程序节点数量。 num_workers 是您所需的执行程序数。 缺省值为每个工作程序节点对应 1 个执行程序。 支持的最大执行程序数为 50。
worker_size 可选 使用参数 cpumemory
CpU 必需(如果已指定 worker_size)。 整数 指定工作程序节点的 CPU 数量。 缺省值为 1 个 CPU。 最多支持10个CPU
memory 必需(如果已指定 worker_size)。 整数 指定每个工作程序节点的内存量。 缺省值为 1 GB。 最大值为40 GB
主尺寸 可选 接收参数 cpumemory
CpU 如果 master_size 指定了,则需要 整数 指定主节点的CPU资源量。 缺省值为 1 个 CPU。 最多支持10个CPU
memory 如果 master_size 指定了,则需要 整数 指定每个主节点的内存大小。 缺省值为 1 GB。 最大值为40 GB
内核大小 可选 接收参数 cpumemory
CpU 如果 kernel_size 指定了,则需要 整数 指定内核节点的CPU分配量。 缺省值为 1 个 CPU。 最多支持10个CPU
memory 如果 kernel_size 指定了,则需要 整数 指定每个内核节点的内存量。 缺省值为 1 GB。 最大值为40 GB

查看内核状态

创建Spark内核后,您可以查看内核详情。

要查看内核状态,请输入:

curl -k -X GET <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer ${TOKEN}"

响应示例:

{
  "id": "<kernel_id>",
  "name": "scala",
  "connections": 0,
  "last_activity": "2021-07-16T11:06:26.266275Z",
  "execution_state": "starting"
}

删除内核

您可以通过输入以下命令删除Spark内核:

curl -k -X DELETE <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer ${TOKEN}"

列出内核

输入以下命令即可列出所有活动的Spark内核:

curl -k -X GET <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}"

使用 Spark 内核

您可以使用由提供的内核 Analytics Engine Powered by Apache Spark API。 通过WebSocket连接创建内核、检查内核状态以及删除内核。

以下 Python 示例代码使用 Tornado 库向 Jupyter 内核网关服务发起 HTTP 和 WebSocket 调用。 要运行此示例代码,您需要一个已安装 Tornado Python 包的运行时环境。

要创建一个Spark应用程序:

  1. 安装 pip。 若未安装,请使用以下命令安装 Tornado Python 包:

    yum install  python3-pip -y ; pip3 install tornado
    
  2. 在工作目录中创建一个名为 的文件 client.py ,其中包含以下代码:

    以下示例代码创建一个Spark Scala 内核,并将 Scala 代码提交至该内核执行:

    from uuid import uuid4
    import requests
    import json
    from tornado import gen
    from tornado.escape import json_encode, json_decode, url_escape
    from tornado.httpclient import AsyncHTTPClient, HTTPRequest
    from tornado.ioloop import IOLoop
    from tornado.websocket import websocket_connect
    @gen.coroutine
    def main():
        url = "https://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/icp4d-api/v1/authorize"
    
        payload = json.dumps({
        "username": "magonzalez",
        "password": "magonzalez"
        })
        headers = {
        'cache-control': 'no-cache',
        'Content-Type': 'application/json'
        }
    
        response = requests.request("POST", url, headers=headers, data=payload)
    
        response_json = json.loads(response.text)
        token = response_json['token']
    
        kg_http_url = "https://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/v4/analytics_engines/c74aa35e-9fcb-4f9a-8e41-59fbdfe1054a/jkg/api/kernels"
        kg_ws_url = "wss://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/v4/analytics_engines/c74aa35e-9fcb-4f9a-8e41-59fbdfe1054a/jkg/api/kernels"
        headers = {"Authorization": 'Bearer {}'.format(token) , "Content-Type": "application/json"}
        validate_cert = False
        kernel_name="scala"
        kernel_payload={"name":"scala","kernel_size":{"cpu":3}}
        print(kernel_payload)
        code = """
            print(s"Spark Version: ${sc.version}")
            print(s"Application Name: ${sc.appName}")
            print(s"Application ID: ${sc.applicationId}")
            import org.apache.spark.sql.SQLContext;
            val sqlContext = new SQLContext(sc);
            val data_df_0 = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").option("mode", "DROPMALFORMED").csv("/opt/ibm/spark/examples/src/main/resources/people.csv");
            data_df_0.show(5)
        """
        print("Using kernel gateway URL: {}".format(kg_http_url))
        print("Using kernel websocket URL: {}".format(kg_ws_url))
        # Remove "/" if exists in JKG url's
        if kg_http_url.endswith("/"):
            kg_http_url=kg_http_url.rstrip('/')
        if kg_ws_url.endswith("/"):
            kg_ws_url=kg_ws_url.rstrip('/')
        client = AsyncHTTPClient()
        # Create kernel
        # POST /api/kernels
        print("Creating kernel {}...".format(kernel_name))
        response = yield client.fetch(
            kg_http_url,
            method='POST',
            headers = headers,
            validate_cert=validate_cert,
            body=json_encode(kernel_payload),
            connect_timeout=240,
            request_timeout=240
        )
        kernel = json_decode(response.body)
        kernel_id = kernel['id']
        print("Created kernel {0}.".format(kernel_id))
        # Connect to kernel websocket
        # GET /api/kernels/<kernel-id>/channels
        # Upgrade: websocket
        # Connection: Upgrade
        print("Connecting to kernel websocket...")
        ws_req = HTTPRequest(url='{}/{}/channels'.format(
            kg_ws_url,
            url_escape(kernel_id)
        ),
            headers = headers,
            validate_cert=validate_cert,
            connect_timeout=240,
            request_timeout=240
        )
        ws = yield websocket_connect(ws_req)
        print("Connected to kernel websocket.")
        # Submit code to websocket on the 'shell' channel
        print("Submitting code: \n{}\n".format(code))
        msg_id = uuid4().hex
        req = json_encode({
            'header': {
                'username': '',
                'version': '5.0',
                'session': '',
                'msg_id': msg_id,
                'msg_type': 'execute_request'
            },
            'parent_header': {},
            'channel': 'shell',
            'content': {
                'code': code,
                'silent': False,
                'store_history': False,
                'user_expressions': {},
                'allow_stdin': False
            },
            'metadata': {},
            'buffers': {}
        })
        # Send an execute request
        ws.write_message(req)
        print("Code submitted. Waiting for response...")
        # Read websocket output until kernel status for this request becomes 'idle'
        kernel_idle = False
        while not kernel_idle:
            msg = yield ws.read_message()
            msg = json_decode(msg)
            msg_type = msg['msg_type']
            print ("Received message type: {}".format(msg_type))
            if msg_type == 'error':
                print('ERROR')
                print(msg)
                break
            # evaluate messages that correspond to our request
            if 'msg_id' in msg['parent_header'] and \
                            msg['parent_header']['msg_id'] == msg_id:
                if msg_type == 'stream':
                    print("  Content: {}".format(msg['content']['text']))
                elif msg_type == 'status' and \
                                msg['content']['execution_state'] == 'idle':
                    kernel_idle = True
        # close websocket
        ws.close()
        # Delete kernel
        # DELETE /api/kernels/<kernel-id>
        print("Deleting kernel...")
        yield client.fetch(
            '{}/{}'.format(kg_http_url, kernel_id),
            method='DELETE',
            headers = headers,
            validate_cert=validate_cert,
        )
        print("Deleted kernel {0}.".format(kernel_id))
    if __name__ == '__main__':
        IOLoop.current().run_sync(main)
    

    其中:

    • <token> 是您在 《使用内核API》 中生成的访问令牌。
    • <kernel_endpoint> 是你在 《使用内核API 》中获取的Spark内核端点。 内核端点的示例: https://<cp4d_route>/v4/analytics_engines/<instance_id>/jkg/api/kernels.
    • <ws_kernel_endpoint> 是通过将Spark内核端点的前缀从https改为wss创建的端点 WebSocket。
  3. 运行脚本文件:

    python client.py
    

以下代码片段展示了如何在 client.py 文件中修改内核名称、内核有效载荷和代码变量,适用于 Python 和 R 内核:

  • Python 3.10:

    kernel_name="python310"
    kernel_payload={"name":"python310"}
    print(kernel_payload)
    code = '\n'.join(( "print(\"Spark Version: {}\".format(sc.version))", "print(\"Application Name: {}\".format(sc._jsc.sc().appName()))", "print(\"Application ID: {} \".format(sc._jsc.sc().applicationId()))", "sc.parallelize([1,2,3,4,5]).count()" ))
    
  • R 4.2:

    kernel_name="r42"
    kernel_payload={"name":"r42"}
    code = """
    cat("Spark Version: ", sparkR.version())
    conf = sparkR.callJMethod(spark, "conf")
    cat("Application Name: ", sparkR.callJMethod(conf, "get", "spark.app.name"))
    cat("Application ID:", sparkR.callJMethod(conf, "get", "spark.app.id"))
    df <- as.DataFrame(list(1,2,3,4,5))
    cat(count(df))
    """