Running Spark applications interactively
You can run your Spark applications interactively by leveraging the Kernel API.
Kernel as a service provides:
- Jupyter kernels as a first class entity
- A dedicated cluster per kernel
- Cluster and Kernel customized with user libraries
Using the Kernel API
You can run a Spark application interactively by using the Kernel API. Each application runs in a kernel in a dedicated cluster. Any configuration settings that you pass through the API overrides the default configurations.
To run a Spark application interactively:
-
Generate an access token if you do not have one. See Generate an access token.
-
Export the token into a variable:
export TOKEN=<token generated>
-
Provision an Analytics Engine powered by Apache Spark instance. You need an Administrator role in the project or in the deployment space to provision an instance. See Provisioning an instance.
-
Get the Spark kernel endpoint for the instance.
- From the navigation menu
in Cloud Pak for Data, click Services > Instances, find the instance and click it to view the instance details.
- Under Access information, copy and save the Spark kernel endpoint.
- From the navigation menu
-
Create a kernel by using the kernel endpoint and access token that you generated. This example includes the minimal mandatory parameters that are necessary:
curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}" -d '{"name":"scala" }'
Create kernel JSON and supported parameters
If you use the Kernel service to start the create kernel REST API, you can add advanced configurations to the payload.
Consider the following sample payload and list of parameters that you can pass in create kernel API:
curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}" -d '{
"name": "scala",
"kernel_size": {
"cpu": 1,
"memory": "1g"
},
"engine": {
"type": "spark",
"conf": {
"spark.ui.reverseProxy": "false",
"spark.eventLog.enabled": "false"
},
"size": {
"num_workers": "2",
"worker_size": {
"cpu": 1,
"memory": "1g"
}
}
}
}'
Response:
{
"id": "<kernel_id>",
"name": "scala",
"connections": 0,
"last_activity": "2021-07-16T11:06:26.266275Z",
"execution_state": "starting"
}
Spark kernel API by using custom spark runtime
An example of an input payload for changing spark runtime version:
{
"name": "python310",
"kernel_size": {
"cpu": 1,
"memory": "1g"
},
"engine": {
"type": "spark",
"runtime": {
"spark_version": "3.4"
},
"size": {
"num_workers": "2",
"worker_size": {
"cpu": 1,
"memory": "1g"
}
}
}
}
Spark kernels API parameters
These are the parameters that you can use in the Kernel API:
Name | Required/Optional | Type | Description |
---|---|---|---|
name | Required | String | Specifies the kernel name. Supported values are scala , r , r42 , python39 , python310 |
engine | Optional | Key-value pairs | Specifies the Spark runtime with configuration and version information |
engine.runtime.spark_version | Optional | String | Specifies Spark runtime version to be used for the kernel. IBM Cloud Pak for Data supports Spark 3.4. |
type | Required if engine is specified | String | Specifies the kernel runtime type. Currently, only spark is supported. |
conf | Optional | Key-value JSON object | Specifies the Spark configuration values that override the predefined values |
env | Optional | Key-value JSON object | Specifies Spark environment variables required for the job |
size | Optional | Takes the parameters num_workers , worker_size and master_size |
|
num_workers | Required if size is specified | Integer | Specifies the number of worker nodes in the Spark cluster. num_workers is equal to the number of executors you want. The default is 1 executor per worker node. The maximum number of executors supported is 50. |
worker_size | Optional | Takes the parameters cpu and memory . |
|
cpu | Required if worker_size is specified |
Integer | Specifies the amount of CPU for the worker node. Default is 1 CPU. Maximum is 10 CPU |
memory | Required if worker_size is specified |
Integer | Specifies the amount of memory for each worker node. Default is 1 GB. Maximum is 40 GB |
master_size | Optional | Takes the parameters cpu and memory |
|
cpu | Required if master_size is specified |
Integer | Specifies the amount of CPU for the master node. Default is 1 CPU. Maximum is 10 CPU |
memory | Required if master_size is specified |
Integer | Specifies the amount of memory for each master node. Default is 1 GB. Maximum is 40 GB |
kernel_size | Optional | Takes the parameters cpu and memory |
|
cpu | Required if kernel_size is specified |
Integer | Specifies the amount of CPU for the kernel node. Default is 1 CPU. Maximum is 10 CPU |
memory | Required if kernel_size is specified |
Integer | Specifies the amount of memory for each kernel node. Default is 1 GB. Maximum is 40 GB |
Viewing kernel status
After you have created your Spark kernel, you can view the kernel details.
To view the kernel status, enter:
curl -k -X GET <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer ${TOKEN}"
Example response:
{
"id": "<kernel_id>",
"name": "scala",
"connections": 0,
"last_activity": "2021-07-16T11:06:26.266275Z",
"execution_state": "starting"
}
Deleting a kernel
You can delete a Spark kernel by entering the following:
curl -k -X DELETE <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer ${TOKEN}"
Listing kernels
You can list all the active Spark kernels by entering the following:
curl -k -X GET <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}"
Using Spark kernels
You can use the Kernel API provided by Analytics Engine Powered by Apache Spark. To create a kernel, check the status of a kernel and delete a kernel with a websocket connection.
The following Python sample code uses Tornado libraries to make HTTP and WebSocket calls to a Jupyter Kernel Gateway service. You need a Python runtime environment with the Tornado package installed to run this sample code.
To create a Spark application:
-
Install pip. If you don't have it, install the Python Tornado package using this command:
yum install python3-pip -y ; pip3 install tornado
-
In a working directory, create a file called
client.py
containing the following code.The following sample code creates a Spark Scala kernel and submits Scala code to it for execution:
from uuid import uuid4 import requests import json from tornado import gen from tornado.escape import json_encode, json_decode, url_escape from tornado.httpclient import AsyncHTTPClient, HTTPRequest from tornado.ioloop import IOLoop from tornado.websocket import websocket_connect @gen.coroutine def main(): url = "https://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/icp4d-api/v1/authorize" payload = json.dumps({ "username": "magonzalez", "password": "magonzalez" }) headers = { 'cache-control': 'no-cache', 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) response_json = json.loads(response.text) token = response_json['token'] kg_http_url = "https://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/v4/analytics_engines/c74aa35e-9fcb-4f9a-8e41-59fbdfe1054a/jkg/api/kernels" kg_ws_url = "wss://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/v4/analytics_engines/c74aa35e-9fcb-4f9a-8e41-59fbdfe1054a/jkg/api/kernels" headers = {"Authorization": 'Bearer {}'.format(token) , "Content-Type": "application/json"} validate_cert = False kernel_name="scala" kernel_payload={"name":"scala","kernel_size":{"cpu":3}} print(kernel_payload) code = """ print(s"Spark Version: ${sc.version}") print(s"Application Name: ${sc.appName}") print(s"Application ID: ${sc.applicationId}") import org.apache.spark.sql.SQLContext; val sqlContext = new SQLContext(sc); val data_df_0 = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").option("mode", "DROPMALFORMED").csv("/opt/ibm/spark/examples/src/main/resources/people.csv"); data_df_0.show(5) """ print("Using kernel gateway URL: {}".format(kg_http_url)) print("Using kernel websocket URL: {}".format(kg_ws_url)) # Remove "/" if exists in JKG url's if kg_http_url.endswith("/"): kg_http_url=kg_http_url.rstrip('/') if kg_ws_url.endswith("/"): kg_ws_url=kg_ws_url.rstrip('/') client = AsyncHTTPClient() # Create kernel # POST /api/kernels print("Creating kernel {}...".format(kernel_name)) response = yield client.fetch( kg_http_url, method='POST', headers = headers, validate_cert=validate_cert, body=json_encode(kernel_payload), connect_timeout=240, request_timeout=240 ) kernel = json_decode(response.body) kernel_id = kernel['id'] print("Created kernel {0}.".format(kernel_id)) # Connect to kernel websocket # GET /api/kernels/<kernel-id>/channels # Upgrade: websocket # Connection: Upgrade print("Connecting to kernel websocket...") ws_req = HTTPRequest(url='{}/{}/channels'.format( kg_ws_url, url_escape(kernel_id) ), headers = headers, validate_cert=validate_cert, connect_timeout=240, request_timeout=240 ) ws = yield websocket_connect(ws_req) print("Connected to kernel websocket.") # Submit code to websocket on the 'shell' channel print("Submitting code: \n{}\n".format(code)) msg_id = uuid4().hex req = json_encode({ 'header': { 'username': '', 'version': '5.0', 'session': '', 'msg_id': msg_id, 'msg_type': 'execute_request' }, 'parent_header': {}, 'channel': 'shell', 'content': { 'code': code, 'silent': False, 'store_history': False, 'user_expressions': {}, 'allow_stdin': False }, 'metadata': {}, 'buffers': {} }) # Send an execute request ws.write_message(req) print("Code submitted. Waiting for response...") # Read websocket output until kernel status for this request becomes 'idle' kernel_idle = False while not kernel_idle: msg = yield ws.read_message() msg = json_decode(msg) msg_type = msg['msg_type'] print ("Received message type: {}".format(msg_type)) if msg_type == 'error': print('ERROR') print(msg) break # evaluate messages that correspond to our request if 'msg_id' in msg['parent_header'] and \ msg['parent_header']['msg_id'] == msg_id: if msg_type == 'stream': print(" Content: {}".format(msg['content']['text'])) elif msg_type == 'status' and \ msg['content']['execution_state'] == 'idle': kernel_idle = True # close websocket ws.close() # Delete kernel # DELETE /api/kernels/<kernel-id> print("Deleting kernel...") yield client.fetch( '{}/{}'.format(kg_http_url, kernel_id), method='DELETE', headers = headers, validate_cert=validate_cert, ) print("Deleted kernel {0}.".format(kernel_id)) if __name__ == '__main__': IOLoop.current().run_sync(main)
where:
<token>
is the access token you generated in Using the Kernel API.<kernel_endpoint>
is the Spark kernel endpoint you got in Using the Kernel API. Example of a kernel endpoint:https://<cp4d_route>/v4/analytics_engines/<instance_id>/jkg/api/kernels
.<ws_kernel_endpoint>
is the WebSocket endpoint that you created by taking the Spark kernel endpoint and changing the https prefix to wss.
-
Run the script file:
python client.py
Here are the code snippets that show how the kernel name, kernel payload and code variables can be modified in the client.py
file for Python and R kernels:
-
Python 3.10:
kernel_name="python310" kernel_payload={"name":"python310"} print(kernel_payload) code = '\n'.join(( "print(\"Spark Version: {}\".format(sc.version))", "print(\"Application Name: {}\".format(sc._jsc.sc().appName()))", "print(\"Application ID: {} \".format(sc._jsc.sc().applicationId()))", "sc.parallelize([1,2,3,4,5]).count()" ))
-
R 4.2:
kernel_name="r42" kernel_payload={"name":"r42"} code = """ cat("Spark Version: ", sparkR.version()) conf = sparkR.callJMethod(spark, "conf") cat("Application Name: ", sparkR.callJMethod(conf, "get", "spark.app.name")) cat("Application ID:", sparkR.callJMethod(conf, "get", "spark.app.id")) df <- as.DataFrame(list(1,2,3,4,5)) cat(count(df)) """
Parent topic: Getting started with Spark applications