Spark 애플리케이션을 대화형으로 실행하기
커널 API를 활용하여 Spark 애플리케이션을 대화형으로 실행할 수 있습니다.
서비스로서의 커널(KaaS)은 다음을 제공합니다:
- Jupyter 커널을 일류 객체로
- 커널당 전용 클러스터
- 사용자 라이브러리로 커스터마이징된 클러스터 및 커널
커널 API 사용
커널 API를 사용하여 Spark 애플리케이션을 대화식으로 실행할 수 있습니다. 각 애플리케이션은 전용 클러스터 내 커널에서 실행됩니다. API를 통해 전달하는 모든 구성 설정은 기본 구성을 재정의합니다.
Spark 애플리케이션을 대화형으로 실행하려면:
액세스 토큰이 없는 경우 생성하십시오. 액세스 토큰 생성하기를 참조하십시오.
토큰을 변수에 저장합니다:
export TOKEN=<token generated>인스턴스를 Analytics Engine powered by Apache Spark 제공하십시오. 프로젝트 또는 배포 공간에서 관리자 역할이 있어야 인스턴스를 프로비저닝할 수 있습니다. 인스턴스 프로비저닝을 참조하십시오.
인스턴스에 대한 Spark 커널 엔드포인트를 가져옵니다.
- 탐색
메뉴에서 서비스 > Cloud Pak for Data 인스턴스를 클릭하고, 해당 인스턴스를 찾아 클릭하여 인스턴스 세부 정보를 확인하세요.
- 액세스 정보에서 Spark 커널 엔드포인트를 복사하여 저장하십시오.
- 탐색
생성한 커널 엔드포인트와 액세스 토큰을 사용하여 커널을 생성하십시오.이 예제는 필요한 최소한의 필수 매개변수를 포함합니다:
curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}" -d '{"name":"scala" }'
커널 JSON 및 지원되는 매개변수 생성
커널 서비스를 사용하여 커널 생성 REST API를 시작하는 경우, 페이로드에 고급 구성을 추가할 수 있습니다.
다음과 같은 샘플 페이로드와 커널 생성 API에 전달할 수 있는 매개변수 목록을 고려하십시오:
curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}" -d '{
"name": "scala",
"kernel_size": {
"cpu": 1,
"memory": "1g"
},
"engine": {
"type": "spark",
"conf": {
"spark.ui.reverseProxy": "false",
"spark.eventLog.enabled": "false"
},
"size": {
"num_workers": "2",
"worker_size": {
"cpu": 1,
"memory": "1g"
}
}
}
}'
응답:
{
"id": "<kernel_id>",
"name": "scala",
"connections": 0,
"last_activity": "2021-07-16T11:06:26.266275Z",
"execution_state": "starting"
}
사용자 정의 Spark 런타임을 사용하여 Spark 커널 API 활용
스파크 런타임 버전 변경을 위한 입력 페이로드 예시:
{
"name": "python310",
"kernel_size": {
"cpu": 1,
"memory": "1g"
},
"engine": {
"type": "spark",
"runtime": {
"spark_version": "3.4"
},
"size": {
"num_workers": "2",
"worker_size": {
"cpu": 1,
"memory": "1g"
}
}
}
}
Spark 커널 API 매개변수
다음은 커널 API에서 사용할 수 있는 매개변수입니다:
| 이름 | 필수/선택 | 유형 | 설명 |
|---|---|---|---|
| 이름 | 필수 | 문자열 | 커널 이름을 지정합니다. 지원되는 값은, r, r42, scala python39, 입니다. python310 |
| 엔진 | 선택적 | 키-값 쌍 | Spark 런타임의 구성 및 버전 정보를 지정합니다 |
| engine.runtime.spark_version | 선택적 | 문자열 | 커널에 사용할 Spark 런타임 버전을 지정합니다. IBM Cloud Pak for Data Spark를 지원합니다 3.4. |
| 유형 | 엔진이 지정된 경우 필수 | 문자열 | 커널 런타임 유형을 지정합니다. 현재 만 spark 지원됩니다. |
| conf | 선택적 | 키-값 JSON 객체 | 사전 정의된 값을 재정의하는 Spark 구성 값을 지정합니다 |
| 환경 | 선택적 | 키-값 JSON 객체 | 작업에 필요한 Spark 환경 변수를 지정합니다 |
| 크기 | 선택적 | 매개변수 num_workers, worker_size 및 master_size |
|
| 작업자 수 | 크기가 지정된 경우 필수 | 정수 | Spark 클러스터의 워커 노드 수를 지정합니다. num_workers 원하는 실행자 수와 같습니다. 기본값은 작업자 노드당 1개의 실행기입니다. 지원되는 최대 실행자 수는 50개입니다. |
| 작업자 크기 | 선택적 | 매개변수 cpu 와 를 memory받습니다. |
|
| cpu | 가 worker_size 지정된 경우 필수 |
정수 | 워커 노드에 할당할 CPU 양을 지정합니다. 기본값은 1개의 CPU입니다. 최대 10개 CPU |
| 메모리 | 가 worker_size 지정된 경우 필수 |
정수 | 각 작업자 노드에 할당할 메모리 양을 지정합니다. 기본값은 1GB입니다. 최대 용량은 40GB입니다 |
| 마스터_사이즈 | 선택적 | 매개변수 cpu 와 를 받습니다. memory |
|
| cpu | 가 master_size 지정된 경우 필수 |
정수 | 마스터 노드에 할당할 CPU 양을 지정합니다. 기본값은 1개의 CPU입니다. 최대 10개 CPU |
| 메모리 | 가 master_size 지정된 경우 필수 |
정수 | 각 마스터 노드에 할당할 메모리 양을 지정합니다. 기본값은 1GB입니다. 최대 용량은 40GB입니다 |
| 커널 크기 | 선택적 | 매개변수 cpu 와 를 받습니다. memory |
|
| cpu | 가 kernel_size 지정된 경우 필수 |
정수 | 커널 노드에 할당할 CPU 양을 지정합니다. 기본값은 1개의 CPU입니다. 최대 10개 CPU |
| 메모리 | 가 kernel_size 지정된 경우 필수 |
정수 | 각 커널 노드에 할당할 메모리 양을 지정합니다. 기본값은 1GB입니다. 최대 용량은 40GB입니다 |
커널 상태 보기
Spark 커널을 생성한 후에는 커널 세부 정보를 확인할 수 있습니다.
커널 상태를 보려면 다음을 입력하십시오:
curl -k -X GET <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer ${TOKEN}"
답변 예시:
{
"id": "<kernel_id>",
"name": "scala",
"connections": 0,
"last_activity": "2021-07-16T11:06:26.266275Z",
"execution_state": "starting"
}
커널 삭제
다음 명령어를 입력하여 Spark 커널을 삭제할 수 있습니다:
curl -k -X DELETE <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer ${TOKEN}"
커널 목록
다음 명령어를 입력하면 활성화된 모든 Spark 커널을 나열할 수 있습니다:
curl -k -X GET <KERNEL_ENDPOINT> -H "Authorization: Bearer ${TOKEN}"
Spark 커널 사용
제공하는 커널 API를 사용할 Analytics Engine Powered by Apache Spark 수 있습니다. 웹소켓 연결을 사용하여 커널 생성, 커널 상태 확인 및 커널 삭제.
다음 Python 샘플 코드는 Tornado 라이브러리를 사용하여 Jupyter 커널 게이트웨이 서비스에 대한 및 WebSocket 호출을 HTTP 수행합니다. 이 샘플 코드를 실행하려면 Tornado 패키지가 설치된 런타임 Python 환경이 필요합니다.
Spark 애플리케이션을 생성하려면:
pip를 설치하세요. 설치되어 있지 않다면 다음 명령어를 사용하여 Python Tornado 패키지를 설치하세요:
yum install python3-pip -y ; pip3 install tornado작업 디렉터리에서 라는 파일을 생성하고
client.py다음 코드를 포함시킵니다.다음 샘플 코드는 Spark Scala 커널을 생성하고 실행을 위해 해당 커널에 코드를 Scala 제출합니다:
from uuid import uuid4 import requests import json from tornado import gen from tornado.escape import json_encode, json_decode, url_escape from tornado.httpclient import AsyncHTTPClient, HTTPRequest from tornado.ioloop import IOLoop from tornado.websocket import websocket_connect @gen.coroutine def main(): url = "https://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/icp4d-api/v1/authorize" payload = json.dumps({ "username": "magonzalez", "password": "magonzalez" }) headers = { 'cache-control': 'no-cache', 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) response_json = json.loads(response.text) token = response_json['token'] kg_http_url = "https://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/v4/analytics_engines/c74aa35e-9fcb-4f9a-8e41-59fbdfe1054a/jkg/api/kernels" kg_ws_url = "wss://cpd-cpd-instance.apps.ocp-270005she9-tcvv.cloud.techzone.ibm.com/v4/analytics_engines/c74aa35e-9fcb-4f9a-8e41-59fbdfe1054a/jkg/api/kernels" headers = {"Authorization": 'Bearer {}'.format(token) , "Content-Type": "application/json"} validate_cert = False kernel_name="scala" kernel_payload={"name":"scala","kernel_size":{"cpu":3}} print(kernel_payload) code = """ print(s"Spark Version: ${sc.version}") print(s"Application Name: ${sc.appName}") print(s"Application ID: ${sc.applicationId}") import org.apache.spark.sql.SQLContext; val sqlContext = new SQLContext(sc); val data_df_0 = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").option("mode", "DROPMALFORMED").csv("/opt/ibm/spark/examples/src/main/resources/people.csv"); data_df_0.show(5) """ print("Using kernel gateway URL: {}".format(kg_http_url)) print("Using kernel websocket URL: {}".format(kg_ws_url)) # Remove "/" if exists in JKG url's if kg_http_url.endswith("/"): kg_http_url=kg_http_url.rstrip('/') if kg_ws_url.endswith("/"): kg_ws_url=kg_ws_url.rstrip('/') client = AsyncHTTPClient() # Create kernel # POST /api/kernels print("Creating kernel {}...".format(kernel_name)) response = yield client.fetch( kg_http_url, method='POST', headers = headers, validate_cert=validate_cert, body=json_encode(kernel_payload), connect_timeout=240, request_timeout=240 ) kernel = json_decode(response.body) kernel_id = kernel['id'] print("Created kernel {0}.".format(kernel_id)) # Connect to kernel websocket # GET /api/kernels/<kernel-id>/channels # Upgrade: websocket # Connection: Upgrade print("Connecting to kernel websocket...") ws_req = HTTPRequest(url='{}/{}/channels'.format( kg_ws_url, url_escape(kernel_id) ), headers = headers, validate_cert=validate_cert, connect_timeout=240, request_timeout=240 ) ws = yield websocket_connect(ws_req) print("Connected to kernel websocket.") # Submit code to websocket on the 'shell' channel print("Submitting code: \n{}\n".format(code)) msg_id = uuid4().hex req = json_encode({ 'header': { 'username': '', 'version': '5.0', 'session': '', 'msg_id': msg_id, 'msg_type': 'execute_request' }, 'parent_header': {}, 'channel': 'shell', 'content': { 'code': code, 'silent': False, 'store_history': False, 'user_expressions': {}, 'allow_stdin': False }, 'metadata': {}, 'buffers': {} }) # Send an execute request ws.write_message(req) print("Code submitted. Waiting for response...") # Read websocket output until kernel status for this request becomes 'idle' kernel_idle = False while not kernel_idle: msg = yield ws.read_message() msg = json_decode(msg) msg_type = msg['msg_type'] print ("Received message type: {}".format(msg_type)) if msg_type == 'error': print('ERROR') print(msg) break # evaluate messages that correspond to our request if 'msg_id' in msg['parent_header'] and \ msg['parent_header']['msg_id'] == msg_id: if msg_type == 'stream': print(" Content: {}".format(msg['content']['text'])) elif msg_type == 'status' and \ msg['content']['execution_state'] == 'idle': kernel_idle = True # close websocket ws.close() # Delete kernel # DELETE /api/kernels/<kernel-id> print("Deleting kernel...") yield client.fetch( '{}/{}'.format(kg_http_url, kernel_id), method='DELETE', headers = headers, validate_cert=validate_cert, ) print("Deleted kernel {0}.".format(kernel_id)) if __name__ == '__main__': IOLoop.current().run_sync(main)상황:
스크립트 파일을 실행하십시오:
python client.py
다음은 커널 이름, 커널 페이로드 및 코드 변수를 client.py 파일에서 및 R Python 커널용으로 수정하는 방법을 보여주는 코드 조각입니다:
Python 3.10:
kernel_name="python310" kernel_payload={"name":"python310"} print(kernel_payload) code = '\n'.join(( "print(\"Spark Version: {}\".format(sc.version))", "print(\"Application Name: {}\".format(sc._jsc.sc().appName()))", "print(\"Application ID: {} \".format(sc._jsc.sc().applicationId()))", "sc.parallelize([1,2,3,4,5]).count()" ))R 4.2:
kernel_name="r42" kernel_payload={"name":"r42"} code = """ cat("Spark Version: ", sparkR.version()) conf = sparkR.callJMethod(spark, "conf") cat("Application Name: ", sparkR.callJMethod(conf, "get", "spark.app.name")) cat("Application ID:", sparkR.callJMethod(conf, "get", "spark.app.id")) df <- as.DataFrame(list(1,2,3,4,5)) cat(count(df)) """