IDAX 데이터 소스를 사용한 Spark 애플리케이션 작성
일반적으로 Spark 애플리케이션은 데이터베이스에서 데이터를 읽고, 이 데이터를 Spark 클러스터에서 고도로 병렬화된 방식으로 처리한 후 결과를 다시 데이터베이스에 기록합니다.
Spark 클러스터는 Db2® 클러스터 노드와 함께 위치합니다. 이처럼 함께 위치하면 Db2와 Spark 클러스터가 각 노드에서 데이터를 Db2 헤드 노드를 통해 리라우트할 필요 없이 서로 로컬로 전송할 수 있습니다. Db2에서 데이터가 파티션된 방식에 따라, 이러한 유형의 데이터 전송은 성능을 크게 향상시킬 수 있습니다.
공통 위치의 데이터 읽기를 사용하려면 Spark 애플리케이션에서 IDAX 데이터 소스를 사용하십시오.
IDAX 데이터 소스 매개변수의 이름 및 설명
다음 표는 일반 매개변수의 이름 및 설명을 보여줍니다.
| 일반 매개변수의 이름 | 설명 | 참고 |
|---|---|---|
| datasource name |
R의 경우 다음을 지정하십시오.
Python, Scala 및 Java의 경우 다음을 지정하십시오.
필수사항입니다. |
없음. |
| dbtable |
선택적으로 스키마가 포함된 테이블 이름입니다.
필수사항입니다. |
테이블 ID만 지원되며, 플레인 JDBC에서 허용되는 SQL 서브쿼리는 지원되지 않습니다. sqlPredicate 매개변수를 대신 사용하십시오. |
| url | JDBC URL이며, 예를 들면 다음과 같습니다.
선택사항입니다. 기본값: 유형 2 JDBC:
|
URL 형식 또는 유형 4 JDBC를 사용하는 경우 user 매개변수 및 password 매개변수는 필수입니다. |
| user | 데이터베이스 액세스를 위한 사용자 ID입니다. 유형 2 JDBC 연결의 경우 선택사항입니다. |
이 매개변수를 지정하지 않은 경우에는 Spark 작업을 제출하는 데 사용된 사용자 ID로 데이터베이스에 액세스합니다. |
| 비밀번호 | 지정된 사용자의 암호입니다. 유형 2 JDBC 연결의 경우 선택사항입니다. |
없음. |
다음 표는 데이터 읽기에 대한 추가 매개변수의 이름 및 설명을 보여줍니다.
| 데이터 읽기에 대한 추가 매개변수의 이름 | 설명 | 참고 |
|---|---|---|
| sqlPredicate | ID>2 AND ID<10과 같은 SQL 필터 표현식입니다. 선택사항입니다. |
없음. |
| colocated | 선택사항입니다. 기본값: TRUE | 이 매개변수를 TRUE로 설정하면 데이터를 로컬로 읽습니다. 데이터를 로컬로 읽으면 데이터 읽기 프로세스의 성능이 향상됩니다. 그 이유는 각 Spark 노드가 동일한 호스트의 데이터베이스 파티션에 저장된 데이터베이스 테이블에 있는 데이터의 일부만 읽기 때문입니다.
이 매개변수를 FALSE로 설정하면 모든 데이터를 헤드 노드를 통해 읽습니다. 유닛 테스트 개발 등과 같이 원격 Db2 클러스터의 데이터에 액세스하는 경우에는 이 설정이 필수입니다. |
다음 표는 데이터 쓰기에 대한 추가 매개변수의 이름 및 설명을 보여줍니다.
| 데이터 쓰기에 대한 추가 매개변수의 이름 | 설명 | 참고 |
|---|---|---|
| allowAppend | Spark 저장 모드 SaveMode.Append를 사용하려면 이 매개변수를 TRUE로 설정해야 합니다. 선택사항입니다. 기본값: FALSE |
추가된 데이터의 양이 크거나, 소스 데이터 프레임이 파티션된 경우에는 해당 추가 조작이 여러 트랜잭션을 포함할 수 있습니다. 이러한 트랜잭션 중 하나에서 오류가 발생하면 오류가 발생한 트랜잭션이 롤백되지만 그 이전에 커미트된 트랜잭션은 영향을 받지 않습니다. 결과적으로 전체 추가 조작이 부분적으로 완료될 수 있습니다. |
| truncateTable | 선택사항입니다. 기본값: FALSE | SaveMode.Overwrite 모드를 사용하며 출력 테이블이 있는 경우에는 truncateTable 매개변수를 TRUE로 설정할 수 있습니다. 이렇게 하면 테이블이 삭제되고 다시 작성되는 대신 잘립니다. 이 접근법은 출력 테이블이 volatile로 선언되는 등과 같이 특정 특성을 가진 경우 유용할 수 있습니다. |
| batchSize | 한 일괄처리의 행 수입니다. 선택사항입니다. 기본값: 1000 |
잠금 에스컬레이션 문제점이 발생하는 경우에만 기본값을 변경하십시오.
데이터가 지정된 크기의 일괄처리로 기록되며, n회의 일괄처리마다 트랜잭션이 커미트됩니다. 병렬 삽입 중 트랜잭션이 너무 커지는 경우에는 삽입기 중 하나에서 잠금 에스컬레이션이 발생하며 테이블에 대한 독점 쓰기 잠금을 얻습니다. 모든 다른 기록기의 트랜잭션은 기아상태가 되고 결국 SQLCODE -911이 발생하며 실패합니다. |
| maxBatchesInTransaction | 트랜잭션당 일괄처리 수입니다. 선택사항입니다. 기본값: 10 |
|
| maxInsertThreadsPerNode | 데이터를 데이터베이스에 삽입하는 데 사용되는 노드당 병렬 스레드의 수입니다. 선택사항입니다. 기본값: 8 또는 사용 가능한 프로세서 수를 4로 나눈 수. 둘 중 더 큰 수가 사용됩니다. |
이 값은 Spark 태스크에 대해 작성되는 Spark 셔플 파티션의 수에 영향을 줍니다. 스레드보다 Spark 파티션이 많은 경우에는 클러스터 노드의 파티션 수가 삽입 스레드의 수로 줄어듭니다. 이 설정이 너무 크면 Db2에서 예외가 발생할 수 있습니다. |
테이블로부터의 데이터 읽기에 대한 예
사용 중인 프로그래밍 언어에 따라 데이터 프레임 판독기에 대해 다음 데이터 소스 형식 및 옵션을 지정하십시오.
- Python, Scala 및 Java의 경우:
df = sparkSession.read \ .format("com.ibm.idax.spark.idaxsource") \ .options(dbtable="BLUADMIN.IRIS", sqlPredicate="ID>2 AND ID<10") \ .load() - R의 경우:
sql.df <- SparkR::read.df("BLUADMIN.IRIS", source="com.ibm.idax.spark.idaxsource", dbtable="BLUADMIN.IRIS", sqlPredicate="ID>2 AND ID<10")
테이블로부터의 데이터 쓰기에 대한 예
사용 중인 프로그래밍 언어에 따라 데이터 프레임 기록기에 대해 다음 데이터 소스 형식, 옵션 및 모드를 지정하십시오.
다음 예에서는 output이 SparkR 데이터 프레임이며 <SparkSaveMode>가 Spark 저장 모드 중 하나라고 가정합니다. 추가 정보는 안전 모드를 참조하고, Python에 대해서는 추가로 pyspark.sql 모듈을 참조하십시오.
- Python, Scala 및 Java의 경우:
output.write \ .format("com.ibm.idax.spark.idaxsource") \ .option("dbtable","MY_OUTPUT_TABLE") \ .mode(<SparkSaveMode>) \ .save() - R의 경우:
write.df(output, "MY_OUTPUT_TABLE", source = "com.ibm.idax.spark.idaxsource", mode = <SparkSaveMode>, dbtable = "MY_OUTPUT_TABLE") - 데이터가 기존 테이블에 추가되는 경우에는 allowAppend 옵션을 true로 설정해야 합니다.
- Scala 및 Java의 경우:
.option("allowAppend","true") .mode(SaveMode.Append) - For Python:
.option("allowAppend","true") .mode("append") - R의 경우:
mode = "append", allowAppend = "true"
- Scala 및 Java의 경우:
- 샘플 애플리케이션 SqlPredicateExample은 애플리케이션이 SQL 술어를 데이터베이스에 푸시하는 방법을 보여줍니다. 데이터의 서브세트만 페치하면 되므로 이 조치는 성능을 향상시킵니다.
- 샘플 애플리케이션 ExceptionExample은 예외가 파일
$HOME/spark/log/submission_id/submission.info에 기록되도록 애플리케이션을 코딩하는 방법을 보여줍니다.
한계 및 제한사항
- Apache log4j 로깅을 Spark 애플리케이션 코드에서 사용자 정의하거나, 애플리케이션 JAR 파일과 log4j 구성 파일을 함께 패키징하여 사용자 정의하지 마십시오. Spark 통합에서는 로그 레벨이 INFO인 org.apache.spark 패키지에 대한 로그 출력이 애플리케이션의 표준 출력에 기록되어야 합니다.
- Spark에서 액세스하는 데이터베이스 컬럼에는 다음 데이터 유형 중 하나가 있어야 합니다.
- CHAR
- VARCHAR
- SMALLINT
- INTEGER
- BIGINT
- FLOAT
- DOUBLE
- REAL
- DECIMAL
- NUMERIC
- DATE
- TIMESTAMP
- TIMESTAMP 변수의 최대 정밀도는 소수점 여섯째 자리(마이크로초 정밀도)입니다. 추가 소수점 자리는 데이터베이스에서 읽은 후, 또는 데이터베이스에 쓰기 전에 잘립니다.
- Db2 Warehouse 보기는 애플리케이션의 입력 소스로 사용할 수 없습니다.
- Db2 Warehouse 데이터베이스 테이블에 데이터를 기록하는 중에 Spark 애플리케이션에서 오류가 발생하는 경우에는 데이터베이스 테이블에 중복 항목이 생기는 것을 방지하기 위해 Spark 재시도가 중지됩니다. Db2 Warehouse 데이터베이스 테이블에서 데이터를 읽는 중에 Spark 애플리케이션에서 오류가 발생하는 경우에는 Spark 재시도가 올바르게 작동합니다.