Spark 애플리케이션을 대화형으로 실행하기

커널 API를 활용하여 Spark 애플리케이션을 대화형으로 실행할 수 있습니다.

서비스로서의 커널(KaaS)은 다음을 제공합니다:

  • Jupyter 커널을 일류 객체로
  • 커널당 전용 클러스터
  • 사용자 라이브러리로 커스터마이징된 클러스터 및 커널

커널 API 사용

커널 API를 사용하여 Spark 애플리케이션을 대화식으로 실행할 수 있습니다. 각 애플리케이션은 전용 클러스터 내 커널에서 실행됩니다. API를 통해 전달하는 모든 구성 설정은 기본 구성을 재정의합니다.

Spark 애플리케이션을 대화형으로 실행하려면:

  1. 액세스 토큰이 없는 경우 생성하십시오. 액세스 토큰 생성하기를 참조하십시오.

  2. 토큰을 변수에 저장합니다:

    export TOKEN=<token generated>
    
  3. 인스턴스를 Analytics Engine powered by Apache Spark 제공하십시오. 프로젝트 또는 배포 공간에서 관리자 역할이 있어야 인스턴스를 프로비저닝할 수 있습니다. 인스턴스 프로비저닝을 참조하십시오.

  4. 인스턴스에 대한 Spark 커널 엔드포인트를 가져옵니다.

    1. 탐색 Cloud Pak for Data 네비게이션 메뉴 메뉴에서 서비스 > Cloud Pak for Data 인스턴스를 클릭하고, 해당 인스턴스를 찾아 클릭하여 인스턴스 세부 정보를 확인하세요.
    2. 액세스 정보에서 Spark 커널 엔드포인트를 복사하여 저장하십시오.
  5. 생성한 커널 엔드포인트와 액세스 토큰을 사용하여 커널을 생성하십시오.이 예제는 필요한 최소한의 필수 매개변수를 포함합니다:

    curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}" -d '{"name":"scala" }'
    

커널 JSON 및 지원되는 매개변수 생성

커널 서비스를 사용하여 커널 생성 REST API를 시작하는 경우, 페이로드에 고급 구성을 추가할 수 있습니다.

다음과 같은 샘플 페이로드와 커널 생성 API에 전달할 수 있는 매개변수 목록을 고려하십시오:

curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}" -d '{
  "name": "scala",
  "kernel_size": {
    "cpu": 1,
    "memory": "1g"
  },
  "engine": {
    "type": "spark",
    "conf": {
      "spark.ui.reverseProxy": "false",
      "spark.eventLog.enabled": "false"
    },
    "size": {
      "num_workers": "2",
      "worker_size": {
        "cpu": 1,
        "memory": "1g"
      }
    }
  }
}'

응답:

{
    "id": "<kernel_id>",
    "name": "scala",
    "connections": 0,
    "last_activity": "2021-07-16T11:06:26.266275Z",
    "execution_state": "starting"
}

사용자 정의 Spark 런타임을 사용하여 Spark 커널 API 활용

스파크 런타임 버전 변경을 위한 입력 페이로드 예시:

{
  "name": "python310",
  "kernel_size": {
    "cpu": 1,
    "memory": "1g"
  },
  "engine": {
    "type": "spark",
    "runtime": {
      "spark_version": "3.4"
    },
    "size": {
      "num_workers": "2",
      "worker_size": {
        "cpu": 1,
        "memory": "1g"
      }
    }
  }
}

Spark 커널 API 매개변수

다음은 커널 API에서 사용할 수 있는 매개변수입니다:

이름 필수/선택 유형 설명
이름 필수 문자열 커널 이름을 지정합니다. 지원되는 값은, r, r42, scala python39, 입니다. python310
엔진 선택적 키-값 쌍 Spark 런타임의 구성 및 버전 정보를 지정합니다
engine.runtime.spark_version 선택적 문자열 커널에 사용할 Spark 런타임 버전을 지정합니다. IBM Cloud Pak for Data Spark를 지원합니다 3.4.
유형 엔진이 지정된 경우 필수 문자열 커널 런타임 유형을 지정합니다. 현재 만 spark 지원됩니다.
conf 선택적 키-값 JSON 객체 사전 정의된 값을 재정의하는 Spark 구성 값을 지정합니다
환경 선택적 키-값 JSON 객체 작업에 필요한 Spark 환경 변수를 지정합니다
크기 선택적 매개변수 num_workers, worker_sizemaster_size
작업자 수 크기가 지정된 경우 필수 정수 Spark 클러스터의 워커 노드 수를 지정합니다. num_workers 원하는 실행자 수와 같습니다. 기본값은 작업자 노드당 1개의 실행기입니다. 지원되는 최대 실행자 수는 50개입니다.
작업자 크기 선택적 매개변수 cpu 와 를 memory받습니다.
cpu worker_size 지정된 경우 필수 정수 워커 노드에 할당할 CPU 양을 지정합니다. 기본값은 1개의 CPU입니다. 최대 10개 CPU
메모리 worker_size 지정된 경우 필수 정수 각 작업자 노드에 할당할 메모리 양을 지정합니다. 기본값은 1GB입니다. 최대 용량은 40GB입니다
마스터_사이즈 선택적 매개변수 cpu 와 를 받습니다. memory
cpu master_size 지정된 경우 필수 정수 마스터 노드에 할당할 CPU 양을 지정합니다. 기본값은 1개의 CPU입니다. 최대 10개 CPU
메모리 master_size 지정된 경우 필수 정수 각 마스터 노드에 할당할 메모리 양을 지정합니다. 기본값은 1GB입니다. 최대 용량은 40GB입니다
커널 크기 선택적 매개변수 cpu 와 를 받습니다. memory
cpu kernel_size 지정된 경우 필수 정수 커널 노드에 할당할 CPU 양을 지정합니다. 기본값은 1개의 CPU입니다. 최대 10개 CPU
메모리 kernel_size 지정된 경우 필수 정수 각 커널 노드에 할당할 메모리 양을 지정합니다. 기본값은 1GB입니다. 최대 용량은 40GB입니다

커널 상태 보기

Spark 커널을 생성한 후에는 커널 세부 정보를 확인할 수 있습니다.

커널 상태를 보려면 다음을 입력하십시오:

curl -k -X GET <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer ${TOKEN}"

답변 예시:

{
  "id": "<kernel_id>",
  "name": "scala",
  "connections": 0,
  "last_activity": "2021-07-16T11:06:26.266275Z",
  "execution_state": "starting"
}

커널 삭제

다음 명령어를 입력하여 Spark 커널을 삭제할 수 있습니다:

curl -k -X DELETE <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer ${TOKEN}"

커널 목록

다음 명령어를 입력하면 활성화된 모든 Spark 커널을 나열할 수 있습니다:

curl -k -X GET <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}"

Spark 커널 사용

제공하는 커널 API를 사용할 Analytics Engine Powered by Apache Spark 수 있습니다. 웹소켓 연결을 사용하여 커널 생성, 커널 상태 확인 및 커널 삭제.

다음 Python 샘플 코드는 Tornado 라이브러리를 사용하여 Jupyter 커널 게이트웨이 서비스에 대한 및 WebSocket 호출을 HTTP 수행합니다. 이 샘플 코드를 실행하려면 Tornado 패키지가 설치된 런타임 Python 환경이 필요합니다.

Spark 애플리케이션을 생성하려면:

  1. pip를 설치하세요. 설치되어 있지 않다면 다음 명령어를 사용하여 Python Tornado 패키지를 설치하세요:

    yum install  python3-pip -y ; pip3 install tornado
    
  2. 작업 디렉터리에서 라는 파일을 생성하고 client.py 다음 코드를 포함시킵니다.

    다음 샘플 코드는 Spark Scala 커널을 생성하고 실행을 위해 해당 커널에 코드를 Scala 제출합니다:

    from uuid import uuid4
    import requests
    import json
    from tornado import gen
    from tornado.escape import json_encode, json_decode, url_escape
    from tornado.httpclient import AsyncHTTPClient, HTTPRequest
    from tornado.ioloop import IOLoop
    from tornado.websocket import websocket_connect
    @gen.coroutine
    def main():
        url = "https://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/icp4d-api/v1/authorize"
    
        payload = json.dumps({
        "username": "magonzalez",
        "password": "magonzalez"
        })
        headers = {
        'cache-control': 'no-cache',
        'Content-Type': 'application/json'
        }
    
        response = requests.request("POST", url, headers=headers, data=payload)
    
        response_json = json.loads(response.text)
        token = response_json['token']
    
        kg_http_url = "https://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/v4/analytics_engines/c74aa35e-9fcb-4f9a-8e41-59fbdfe1054a/jkg/api/kernels"
        kg_ws_url = "wss://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/v4/analytics_engines/c74aa35e-9fcb-4f9a-8e41-59fbdfe1054a/jkg/api/kernels"
        headers = {"Authorization": 'Bearer {}'.format(token) , "Content-Type": "application/json"}
        validate_cert = False
        kernel_name="scala"
        kernel_payload={"name":"scala","kernel_size":{"cpu":3}}
        print(kernel_payload)
        code = """
            print(s"Spark Version: ${sc.version}")
            print(s"Application Name: ${sc.appName}")
            print(s"Application ID: ${sc.applicationId}")
            import org.apache.spark.sql.SQLContext;
            val sqlContext = new SQLContext(sc);
            val data_df_0 = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").option("mode", "DROPMALFORMED").csv("/opt/ibm/spark/examples/src/main/resources/people.csv");
            data_df_0.show(5)
        """
        print("Using kernel gateway URL: {}".format(kg_http_url))
        print("Using kernel websocket URL: {}".format(kg_ws_url))
        # Remove "/" if exists in JKG url's
        if kg_http_url.endswith("/"):
            kg_http_url=kg_http_url.rstrip('/')
        if kg_ws_url.endswith("/"):
            kg_ws_url=kg_ws_url.rstrip('/')
        client = AsyncHTTPClient()
        # Create kernel
        # POST /api/kernels
        print("Creating kernel {}...".format(kernel_name))
        response = yield client.fetch(
            kg_http_url,
            method='POST',
            headers = headers,
            validate_cert=validate_cert,
            body=json_encode(kernel_payload),
            connect_timeout=240,
            request_timeout=240
        )
        kernel = json_decode(response.body)
        kernel_id = kernel['id']
        print("Created kernel {0}.".format(kernel_id))
        # Connect to kernel websocket
        # GET /api/kernels/<kernel-id>/channels
        # Upgrade: websocket
        # Connection: Upgrade
        print("Connecting to kernel websocket...")
        ws_req = HTTPRequest(url='{}/{}/channels'.format(
            kg_ws_url,
            url_escape(kernel_id)
        ),
            headers = headers,
            validate_cert=validate_cert,
            connect_timeout=240,
            request_timeout=240
        )
        ws = yield websocket_connect(ws_req)
        print("Connected to kernel websocket.")
        # Submit code to websocket on the 'shell' channel
        print("Submitting code: \n{}\n".format(code))
        msg_id = uuid4().hex
        req = json_encode({
            'header': {
                'username': '',
                'version': '5.0',
                'session': '',
                'msg_id': msg_id,
                'msg_type': 'execute_request'
            },
            'parent_header': {},
            'channel': 'shell',
            'content': {
                'code': code,
                'silent': False,
                'store_history': False,
                'user_expressions': {},
                'allow_stdin': False
            },
            'metadata': {},
            'buffers': {}
        })
        # Send an execute request
        ws.write_message(req)
        print("Code submitted. Waiting for response...")
        # Read websocket output until kernel status for this request becomes 'idle'
        kernel_idle = False
        while not kernel_idle:
            msg = yield ws.read_message()
            msg = json_decode(msg)
            msg_type = msg['msg_type']
            print ("Received message type: {}".format(msg_type))
            if msg_type == 'error':
                print('ERROR')
                print(msg)
                break
            # evaluate messages that correspond to our request
            if 'msg_id' in msg['parent_header'] and \
                            msg['parent_header']['msg_id'] == msg_id:
                if msg_type == 'stream':
                    print("  Content: {}".format(msg['content']['text']))
                elif msg_type == 'status' and \
                                msg['content']['execution_state'] == 'idle':
                    kernel_idle = True
        # close websocket
        ws.close()
        # Delete kernel
        # DELETE /api/kernels/<kernel-id>
        print("Deleting kernel...")
        yield client.fetch(
            '{}/{}'.format(kg_http_url, kernel_id),
            method='DELETE',
            headers = headers,
            validate_cert=validate_cert,
        )
        print("Deleted kernel {0}.".format(kernel_id))
    if __name__ == '__main__':
        IOLoop.current().run_sync(main)
    

    상황:

    • <token> 커널 API 사용 에서 생성한 액세스 토큰입니다.
    • <kernel_endpoint> 커널 API 사용에서 얻은 Spark 커널 엔드포인트입니다. 커널 엔드포인트 예시: https://<cp4d_route>/v4/analytics_engines/<instance_id>/jkg/api/kernels.
    • <ws_kernel_endpoint> 스파크 커널 엔드포인트를 가져와서 https 접두사를 wss로 변경하여 생성한 엔드포인트입니다 WebSocket.
  3. 스크립트 파일을 실행하십시오:

    python client.py
    

다음은 커널 이름, 커널 페이로드 및 코드 변수를 client.py 파일에서 및 R Python 커널용으로 수정하는 방법을 보여주는 코드 조각입니다:

  • Python 3.10:

    kernel_name="python310"
    kernel_payload={"name":"python310"}
    print(kernel_payload)
    code = '\n'.join(( "print(\"Spark Version: {}\".format(sc.version))", "print(\"Application Name: {}\".format(sc._jsc.sc().appName()))", "print(\"Application ID: {} \".format(sc._jsc.sc().applicationId()))", "sc.parallelize([1,2,3,4,5]).count()" ))
    
  • R 4.2:

    kernel_name="r42"
    kernel_payload={"name":"r42"}
    code = """
    cat("Spark Version: ", sparkR.version())
    conf = sparkR.callJMethod(spark, "conf")
    cat("Application Name: ", sparkR.callJMethod(conf, "get", "spark.app.name"))
    cat("Application ID:", sparkR.callJMethod(conf, "get", "spark.app.id"))
    df <- as.DataFrame(list(1,2,3,4,5))
    cat(count(df))
    """