Running Spark applications interactively

You can run your Spark applications interactively by leveraging the Kernel API.

Kernel as a service provides:

  • Jupyter kernels as a first class entity
  • A dedicated cluster per kernel
  • Cluster and Kernel customized with user libraries

Using the Kernel API

You can run a Spark application interactively by using the Kernel API. Each application runs in a kernel in a dedicated cluster. Any configuration settings that you pass through the API overrides the default configurations.

To run a Spark application interactively:

  1. Generate an access token if you do not have one. See Generate an access token.

  2. Export the token into a variable:

    export TOKEN=<token generated>
    
  3. Provision an Analytics Engine powered by Apache Spark instance. You need an Administrator role in the project or in the deployment space to provision an instance. See Provisioning an instance.

  4. Get the Spark kernel endpoint for the instance.

    1. From the navigation menu Cloud Pak for Data navigation menu in Cloud Pak for Data, click Services > Instances, find the instance and click it to view the instance details.
    2. Under Access information, copy and save the Spark kernel endpoint.
  5. Create a kernel by using the kernel endpoint and access token that you generated. This example includes the minimal mandatory parameters that are necessary:

    curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}" -d '{"name":"scala" }'
    

Create kernel JSON and supported parameters

If you use the Kernel service to start the create kernel REST API, you can add advanced configurations to the payload.

Consider the following sample payload and list of parameters that you can pass in create kernel API:

curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}" -d '{
  "name": "scala",
  "kernel_size": {
    "cpu": 1,
    "memory": "1g"
  },
  "engine": {
    "type": "spark",
    "conf": {
      "spark.ui.reverseProxy": "false",
      "spark.eventLog.enabled": "false"
    },
    "size": {
      "num_workers": "2",
      "worker_size": {
        "cpu": 1,
        "memory": "1g"
      }
    }
  }
}'

Response:

{
    "id": "<kernel_id>",
    "name": "scala",
    "connections": 0,
    "last_activity": "2021-07-16T11:06:26.266275Z",
    "execution_state": "starting"
}

Spark kernel API by using custom spark runtime

An example of an input payload for changing spark runtime version:

{
  "name": "python310",
  "kernel_size": {
    "cpu": 1,
    "memory": "1g"
  },
  "engine": {
    "type": "spark",
    "runtime": {
      "spark_version": "3.4"
    },
    "size": {
      "num_workers": "2",
      "worker_size": {
        "cpu": 1,
        "memory": "1g"
      }
    }
  }
}

Spark kernels API parameters

These are the parameters that you can use in the Kernel API:

Name Required/Optional Type Description
name Required String Specifies the kernel name. Supported values are scala, r, r42, python39, python310
engine Optional Key-value pairs Specifies the Spark runtime with configuration and version information
engine.runtime.spark_version Optional String Specifies Spark runtime version to be used for the kernel. IBM Cloud Pak for Data supports Spark 3.4.
type Required if engine is specified String Specifies the kernel runtime type. Currently, only spark is supported.
conf Optional Key-value JSON object Specifies the Spark configuration values that override the predefined values
env Optional Key-value JSON object Specifies Spark environment variables required for the job
size Optional Takes the parameters num_workers, worker_size and master_size
num_workers Required if size is specified Integer Specifies the number of worker nodes in the Spark cluster. num_workers is equal to the number of executors you want. The default is 1 executor per worker node. The maximum number of executors supported is 50.
worker_size Optional Takes the parameters cpu and memory.
cpu Required if worker_size is specified Integer Specifies the amount of CPU for the worker node. Default is 1 CPU. Maximum is 10 CPU
memory Required if worker_size is specified Integer Specifies the amount of memory for each worker node. Default is 1 GB. Maximum is 40 GB
master_size Optional Takes the parameters cpu and memory
cpu Required if master_size is specified Integer Specifies the amount of CPU for the master node. Default is 1 CPU. Maximum is 10 CPU
memory Required if master_size is specified Integer Specifies the amount of memory for each master node. Default is 1 GB. Maximum is 40 GB
kernel_size Optional Takes the parameters cpu and memory
cpu Required if kernel_size is specified Integer Specifies the amount of CPU for the kernel node. Default is 1 CPU. Maximum is 10 CPU
memory Required if kernel_size is specified Integer Specifies the amount of memory for each kernel node. Default is 1 GB. Maximum is 40 GB

Viewing kernel status

After you have created your Spark kernel, you can view the kernel details.

To view the kernel status, enter:

curl -k -X GET <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer ${TOKEN}"

Example response:

{
  "id": "<kernel_id>",
  "name": "scala",
  "connections": 0,
  "last_activity": "2021-07-16T11:06:26.266275Z",
  "execution_state": "starting"
}

Deleting a kernel

You can delete a Spark kernel by entering the following:

curl -k -X DELETE <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer ${TOKEN}"

Listing kernels

You can list all the active Spark kernels by entering the following:

curl -k -X GET <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}"

Using Spark kernels

You can use the Kernel API provided by Analytics Engine Powered by Apache Spark. To create a kernel, check the status of a kernel and delete a kernel with a websocket connection.

The following Python sample code uses Tornado libraries to make HTTP and WebSocket calls to a Jupyter Kernel Gateway service. You need a Python runtime environment with the Tornado package installed to run this sample code.

To create a Spark application:

  1. Install pip. If you don't have it, install the Python Tornado package using this command:

    yum install  python3-pip -y ; pip3 install tornado
    
  2. In a working directory, create a file called client.py containing the following code.

    The following sample code creates a Spark Scala kernel and submits Scala code to it for execution:

    from uuid import uuid4
    import requests
    import json
    from tornado import gen
    from tornado.escape import json_encode, json_decode, url_escape
    from tornado.httpclient import AsyncHTTPClient, HTTPRequest
    from tornado.ioloop import IOLoop
    from tornado.websocket import websocket_connect
    @gen.coroutine
    def main():
        url = "https://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/icp4d-api/v1/authorize"
    
        payload = json.dumps({
        "username": "magonzalez",
        "password": "magonzalez"
        })
        headers = {
        'cache-control': 'no-cache',
        'Content-Type': 'application/json'
        }
    
        response = requests.request("POST", url, headers=headers, data=payload)
    
        response_json = json.loads(response.text)
        token = response_json['token']
    
        kg_http_url = "https://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/v4/analytics_engines/c74aa35e-9fcb-4f9a-8e41-59fbdfe1054a/jkg/api/kernels"
        kg_ws_url = "wss://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/v4/analytics_engines/c74aa35e-9fcb-4f9a-8e41-59fbdfe1054a/jkg/api/kernels"
        headers = {"Authorization": 'Bearer {}'.format(token) , "Content-Type": "application/json"}
        validate_cert = False
        kernel_name="scala"
        kernel_payload={"name":"scala","kernel_size":{"cpu":3}}
        print(kernel_payload)
        code = """
            print(s"Spark Version: ${sc.version}")
            print(s"Application Name: ${sc.appName}")
            print(s"Application ID: ${sc.applicationId}")
            import org.apache.spark.sql.SQLContext;
            val sqlContext = new SQLContext(sc);
            val data_df_0 = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").option("mode", "DROPMALFORMED").csv("/opt/ibm/spark/examples/src/main/resources/people.csv");
            data_df_0.show(5)
        """
        print("Using kernel gateway URL: {}".format(kg_http_url))
        print("Using kernel websocket URL: {}".format(kg_ws_url))
        # Remove "/" if exists in JKG url's
        if kg_http_url.endswith("/"):
            kg_http_url=kg_http_url.rstrip('/')
        if kg_ws_url.endswith("/"):
            kg_ws_url=kg_ws_url.rstrip('/')
        client = AsyncHTTPClient()
        # Create kernel
        # POST /api/kernels
        print("Creating kernel {}...".format(kernel_name))
        response = yield client.fetch(
            kg_http_url,
            method='POST',
            headers = headers,
            validate_cert=validate_cert,
            body=json_encode(kernel_payload),
            connect_timeout=240,
            request_timeout=240
        )
        kernel = json_decode(response.body)
        kernel_id = kernel['id']
        print("Created kernel {0}.".format(kernel_id))
        # Connect to kernel websocket
        # GET /api/kernels/<kernel-id>/channels
        # Upgrade: websocket
        # Connection: Upgrade
        print("Connecting to kernel websocket...")
        ws_req = HTTPRequest(url='{}/{}/channels'.format(
            kg_ws_url,
            url_escape(kernel_id)
        ),
            headers = headers,
            validate_cert=validate_cert,
            connect_timeout=240,
            request_timeout=240
        )
        ws = yield websocket_connect(ws_req)
        print("Connected to kernel websocket.")
        # Submit code to websocket on the 'shell' channel
        print("Submitting code: \n{}\n".format(code))
        msg_id = uuid4().hex
        req = json_encode({
            'header': {
                'username': '',
                'version': '5.0',
                'session': '',
                'msg_id': msg_id,
                'msg_type': 'execute_request'
            },
            'parent_header': {},
            'channel': 'shell',
            'content': {
                'code': code,
                'silent': False,
                'store_history': False,
                'user_expressions': {},
                'allow_stdin': False
            },
            'metadata': {},
            'buffers': {}
        })
        # Send an execute request
        ws.write_message(req)
        print("Code submitted. Waiting for response...")
        # Read websocket output until kernel status for this request becomes 'idle'
        kernel_idle = False
        while not kernel_idle:
            msg = yield ws.read_message()
            msg = json_decode(msg)
            msg_type = msg['msg_type']
            print ("Received message type: {}".format(msg_type))
            if msg_type == 'error':
                print('ERROR')
                print(msg)
                break
            # evaluate messages that correspond to our request
            if 'msg_id' in msg['parent_header'] and \
                            msg['parent_header']['msg_id'] == msg_id:
                if msg_type == 'stream':
                    print("  Content: {}".format(msg['content']['text']))
                elif msg_type == 'status' and \
                                msg['content']['execution_state'] == 'idle':
                    kernel_idle = True
        # close websocket
        ws.close()
        # Delete kernel
        # DELETE /api/kernels/<kernel-id>
        print("Deleting kernel...")
        yield client.fetch(
            '{}/{}'.format(kg_http_url, kernel_id),
            method='DELETE',
            headers = headers,
            validate_cert=validate_cert,
        )
        print("Deleted kernel {0}.".format(kernel_id))
    if __name__ == '__main__':
        IOLoop.current().run_sync(main)
    

    where:

    • <token> is the access token you generated in Using the Kernel API.
    • <kernel_endpoint> is the Spark kernel endpoint you got in Using the Kernel API. Example of a kernel endpoint: https://<cp4d_route>/v4/analytics_engines/<instance_id>/jkg/api/kernels.
    • <ws_kernel_endpoint> is the WebSocket endpoint that you created by taking the Spark kernel endpoint and changing the https prefix to wss.
  3. Run the script file:

    python client.py
    

Here are the code snippets that show how the kernel name, kernel payload and code variables can be modified in the client.py file for Python and R kernels:

  • Python 3.10:

    kernel_name="python310"
    kernel_payload={"name":"python310"}
    print(kernel_payload)
    code = '\n'.join(( "print(\"Spark Version: {}\".format(sc.version))", "print(\"Application Name: {}\".format(sc._jsc.sc().appName()))", "print(\"Application ID: {} \".format(sc._jsc.sc().applicationId()))", "sc.parallelize([1,2,3,4,5]).count()" ))
    
  • R 4.2:

    kernel_name="r42"
    kernel_payload={"name":"r42"}
    code = """
    cat("Spark Version: ", sparkR.version())
    conf = sparkR.callJMethod(spark, "conf")
    cat("Application Name: ", sparkR.callJMethod(conf, "get", "spark.app.name"))
    cat("Application ID:", sparkR.callJMethod(conf, "get", "spark.app.id"))
    df <- as.DataFrame(list(1,2,3,4,5))
    cat(count(df))
    """
    

Parent topic: Getting started with Spark applications