ETL과 아파치 하둡
일반적으로 기업 인프라를 구성하는 시스템에서는 매우 다양한 로그 파일과 데이터를 생성합니다. 이러한 로그 파일을 처리하려면 적절하게 데이터를 추출하고, 사용하기 쉬운 형태로 가공해야 하며, 데이터베이스 같은 데이터 스토어에 로딩해야 합니다. 이런 일련의 작업을 지원하는 시스템을 ETL(Extract, Transform, Load) 시스템이라고 합니다.
ETL 시스템의 목적 한 가지는 빠르고 손쉽게 데이터를 가공하는 것이므로 손쉽게 구현하기 위한 GUI를 제공하고 이 GUI로 데이터의 흐름, 그리고 각 단계에서 데이터 형식을 변경하거나 처리하는 모듈을 추가하는 것을 지원합니다.
아파치 하둡(Apache Hadoop) 기반 시스템에서는 원시 로그를 그대로 HDFS에 보관하고 처리하는 특징 때문에 실제 처리를 위해 중간에 적절하게 ETL 모듈을 추가하여 처리해야 할 필요성이 있습니다.
일반적으로 ETL 시스템을 앞에 두고 그 뒤에 실제 데이터를 처리하는 데이터베이스가 있는 경우 데이터베이스에 데이터를 넣으려면 ETL을 반드시 거쳐야 합니다. 대용량 파일 시스템을 구비하지 않는 경우 덩치 큰 로그 파일을 있는 그대로 보관할 수 없는 문제 때문에 처리하기 전에 반드시 ETL을 거쳐 파일을 변환하거나 불필요한 칼럼을 삭제하거나, 노이즈를 제거해 파일 크기를 줄이고 원하는 형태로 만들어야 합니다.
하지만 아파치 하둡의 경우 분산 파일 시스템인 HDFS가 있고 파일 시스템이 투명하게 확장되므로 용량 제약에서 해방됩니다. 그래서 ETL을 먼저 거치기보다는 일단 로그 파일을 파일 시스템에 저장하고 실제 데이터를 처리해야 하는 시점에서 ETL 모듈을 통해 제거하고 사용합니다. 기존 ETL 시스템에 제일 나중에 로딩한다면 반대로 아파치 하둡을 제일 먼저 로딩하는 것입니다.
아파치 하둡 기반으로 대용량 데이터를 처리하다 보면 ETL 시스템에서 각종 ETL 모듈이 필요하게 되며 이것을 맵리듀스(MapReduce) API로 구현하기도 하고, 구현하기 어려운 것은 피그(Pig)의 피그 라틴(Pig Latin)을 이용하여 스크립트를 작성하기도 합니다. 이 글에서는 피그 라틴보다는 고성능으로 대용량 파일을 처리할 수 있는 맵리듀스 API를 이용하여 ETL을 구현하는 방법을 알아보겠습니다.
ETL 모듈을 구현하기 전 아파치 하둡 특성 파악하기
ETL 모듈은 아파치 하둡의 맵리듀스를 이해하는 데 상당히 도움이 됩니다. 초기에 이러한 모듈을 구현하려면 상당한 시행착오를 거치는데 그것은 아파치 하둡의 특성을 제대로 파악하지 못했기 때문입니다. 그래서 ETL 모듈을 구현하기 전에 아파치 하둡의 맵리듀스 동작 특성을 간단하게 정리해보겠습니다.
- 하나 이상 입력 경로를 지정할 수 있다.
- 꼭 Reducer를 써야 하는 것은 아니다.
- Reducer를 이용하면 Group By 효과를 볼 수 있다.
- Reducer를 거친 데이터는 정렬된다.
- Input Split 크기를 줄여 동시에 동작하는 Mapper 수를 늘려 성능 향상을 꾀할 수 있다.
- Mapper에서 Reducer로 Key/Value를 전달할 때 Hash Practitioner가 동작하여 Key % Reducer 개수의 결과값을 기반으로 Key/Value를 전달할 Reducer를 결정한다.
- 숫자라도 String과 IntWritable/LongWritable 어떤 것으로 표현하느냐에 따라 정렬 결과가 다를 수 있다.
- 최종 결과를 얻기 위해 하나 이상의 Mapper/Reducer를 사용할 수도 있다.
Clean ETL
Clean ETL 모듈은 실제로 현업에서 굉장히 많이 사용하는 ETL 모듈 중 하나입니다. 로그 파일의 칼럼이 20여 개로 구성되어 있지만 실제로 필요한 칼럼이 세 개라면 나머지 칼럼 17개는 불필요하게 됩니다. 실제로 처리할 모듈이 칼럼 세 개만 받는다고 가정하면(일반적으로 이렇게 구현하는 경우가 대부분입니다) 20개 칼럼으로 구성된 로그 파일을 입력으로 사용하는 경우 적절하게 처리할 수 없습니다. 이러한 이유로 로그 파일에서 불필요한 칼럼 17개를 삭제하고 필요한 칼럼 세 개만 추출하여 실제 처리할 모듈의 입력으로 사용하게 됩니다.
Clean ETL 모듈을 구현하려면 우선 Reducer의 필요 여부를 생각해봐야 합니다. Reducer는 Mapper의 출력을 입력으로 받는데 이때 Merge-Sort가 발생하고 전달하는 데이터 크기가 클수록 HDD I/O, 네트워크 I/O가 일어납니다. 또한 Key 정렬과 Group By가 일어나기 때문에 Reducer 사용 여부를 먼저 결정해야 합니다.
Clean ETL은 입력 파일을 그대로 읽어 Mapper에서 칼럼을 삭제하기만 하면 되므로 구현하기 쉽고 Reducer를 사용하지 않음으로써 불필요한 I/O를 줄여주어 성능도 보장받을 수 있습니다.
드라이버 작성하기
드라이버는 하둡 작업(Hadoop Job)을 실행하는 데 필요한 각종 설정 작업을 하는 자바 클래스입니다. main()
메서드가 구현되어 있는 클래스라 볼 수 있습니다. 모든 드라이버는 기본적으로 아파치 하둡에서 제공하는 클래스와 인터페이스를 구현해야 합니다. 그리고 실제 드라이버는 org.apache.hadoop.util.ToolRunner.run()
메서드를 통해 실행됩니다. 다음은 기본적으로 하둡 작업을 실행하는 드라이버가 갖춰야 할 최소 조건으로 구현한 Clean ETL Driver입니다.
package com.jbossug.hadoop.etl.clean;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* 지정한 칼럼을 삭제하는 Clean ETL Driver.
*
* @author Edward KIM
* @version 1.0
*/
public class CleanDriver extends org.apache.hadoop.conf.Configured
implements org.apache.hadoop.util.Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new CleanDriver(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
Job job = new Job();
parseArguements(args, job);
job.setJarByClass(CleanDriver.class);
// Mapper Class
job.setMapperClass(CleanMapper.class);
// Output Key/Value
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
// Reducer Task
job.setNumReduceTasks(0);
// Run a Hadoop Job
return job.waitForCompletion(true) ? 0 : 1;
}
private void parseArguements(String[] args, Job job) throws IOException {
for (int i = 0; i < args.length; ++i) {
if ("-input".equals(args[i])) {
FileInputFormat.setInputPaths(job, new Path(args[++i]));
} else if ("-output".equals(args[i])) {
FileOutputFormat.setOutputPath(job, new Path(args[++i]));
} else if ("-hdfs".equals(args[i])) {
job.getConfiguration().set("fs.default.name", args[++i]);
} else if ("-jobTracker".equals(args[i])) {
job.getConfiguration().set("mapred.job.tracker", args[++i]);
} else if ("-jobName".equals(args[i])) {
job.getConfiguration().set("mapred.job.name", args[++i]);
} else if ("-columnToClean".equals(args[i])) {
job.getConfiguration().set("columnToClean", args[++i]);
} else if ("-delimiter".equals(args[i])) {
job.getConfiguration().set("delimiter", args[++i]);
}
}
}
}
Reducer를 사용하지 않기 때문에 Reducer 개수를 0으로 지정했고 Reducer도 지정하지 않았습니다. -hdfs와 -jobTracker는 클러스터를 직접 지정하려고할 때 사용할 수 있는 옵션이며 실행하는 장비에 하둡 설정 파일인 core-site.xml, mapred-site.xml, hdfs-site.xml 파일에 하둡 클러스터의 Namenode가 제대로 지정되어 있다면 별도로 지정할 필요는 없습니다. 입력 경로는 -input 옵션으로 지정할 수 있으며 입력 경로를 복수로 지정하고자 한다면 ,로 구분하여 지정하면 됩니다.
Mapper 작성하기
Mapper에서는 Driver로 넘어온 columnToClean
매개변수를 가져와서 처리하는 핵심 역할을 합니다.
package com.jbossug.hadoop.etl.clean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 지정한 칼럼을 삭제하는 Clean ETL Mapper.
* 이 Mapper는 지정한 칼럼을 삭제하고 다시 Delimiter를 이용하여 조립하고 Context Write를 한다.
*
* @author Edward KIM
* @since 1.0
*/
public class CleanMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
private String delimiter;
private int columnToClean;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
delimiter = configuration.get("delimiter");
columnToClean = configuration.getInt("columnToClean", 0);
}
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] columns = value.toString().split(delimiter);
// 삭제할 칼럼 위치와 전체 칼럼 개수가 유효한지 확인한다.
if (!validate(context, columns)) {
return;
}
StringBuilder builder = new StringBuilder();
for (int index = 0; index < columns.length; index++) {
String column = columns[index];
// 삭제할 칼럼이 나타나면 무시한다.
if (index == columnToClean) {
continue;
}
builder.append(column).append(delimiter);
}
// 최종 문자열에서 제일 마지막에 붙은 delimiter를 제거하고 기록한다.
int outputLength = builder.toString().length();
context.write(NullWritable.get(),
new Text(builder.toString().substring(0, outputLength - 1)));
}
/**
* 삭제할 칼럼 위치와 전체 칼럼 개수가 유효한지 확인한다.
*
* @param context Mapper Context
* @param columns 칼럼
* @return 유효하다면 <tt>true</tt>
*/
private boolean validate(Context context, String[] columns) {
if (columnToClean > columns.length - 1) {
context.getCounter("Validation", "INVALID").increment(1);
return false;
}
context.getCounter("Validation", "VALID").increment(1);
return true;
}
}
Mapper 내부 코드에서 특이한 점은 Counter를 이용하여 validation 결과를 기록한다는 점입니다. 대용량 파일을 처리할 때 파일에는 다양한 노이즈가 있을 수 있으므로 ETL 모듈에서는 이러한 노이즈를 보고할 의무가 있습니다. 하둡의 Counter는 이러한 정보를 보고하기에 매우 적절합니다.
ETL 실행하기
이제 ETL 모듈을 실행해 보겠습니다. 우선 pom.xml 파일이 있는 소스 코드 프로젝트의 홈 디렉터리에서 mvn package 명령을 내려 .jar 파일로 패키징하고 출력 파일을 삭제합니다. 그런 다음 하둡 작업을 실행하면 됩니다.
#mvn package
#hadoop fs -rmr hdfs://192.168.1.2:9000/output.txt
#hadoop jar target/hadoop-etl-1.0.jar \
com.jbossug.hadoop.etl.clean.CleanDriver -input /input.txt \
-output /output.txt -cleanToColumn 3 \
-hdfs hdfs://192.168.1.2:9000 -jobTracker 192.168.1.2:9001 \
-delimiter , -jobName Clean
맥 OS X이나 리눅스 환경에서 간단하게 테스트하려고 한다면(로컬 실행 환경) -hdfs, -jobTracker 옵션을 제외하면 됩니다. 그러면 자동으로 로컬 모드로 동작하며 좀 더 편리하게 디버깅할 수 있습니다.
Aggregate ETL
Aggregate ETL은 하나 이상의 파일을 합치는 ETL입니다. 물론 하둡이 복수 입력을 받을 수 있기 때문에 크게 의미가 없을 수 있으나 경우에 따라서는 파일 여러 개를 하나로 합쳐서 타 시스템에 전달해야 하는 경우에는 필요합니다.
Aggregate ETL을 하둡으로 구현하려면 입력을 여러 개 받을 수 있는 것과 Reducer를 쓰지 않아도 되는 두 가지를 이해해야 하는 것이 핵심이라 할 수 있습니다.
드라이버 작성하기
Clean ETL 드라이버와 거의 똑같이 작성했지만 다른 특징은 입력 경로를 하나 이상 받도록 처리했다는 것입니다. Aggregate가 기본으로 하나 이상의 파일을 합치는 기능이 있기 때문에 경로 또한 하나 이상 받을 수 있어야 합니다. 물론 FileInputFormat.addInputPaths()
메서드나 FileInputFormat.setInputPaths()
메서드는 기본으로 쉼표로 분리된 리스트(comma separated list)를 받도록 구성되어 있지만 필자의 경우 다음과 같이 명시적으로 지정하는 것을 더 좋아합니다.
package com.jbossug.hadoop.etl.aggregate;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* 하나 이상의 입력 파일을 받아서 합치는 Aggregation ETL Driver.
*
* @author Edward KIM
* @version 1.0
*/
public class AggregateDriver extends org.apache.hadoop.conf.Configured
implements org.apache.hadoop.util.Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new AggregateDriver(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
Job job = new Job();
parseArguements(args, job);
job.setJarByClass(AggregateDriver.class);
// Mapper Class
job.setMapperClass(AggregateMapper.class);
// Output Key/Value
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
// Reducer Task
job.setNumReduceTasks(0);
// Run a Hadoop Job
return job.waitForCompletion(true) ? 0 : 1;
}
private void parseArguements(String[] args, Job job) throws IOException {
for (int i = 0; i < args.length; ++i) {
if ("-input".equals(args[i])) {
String files = args[++i];
for (String file : files.split(",")) {
FileInputFormat.addInputPath(job, new Path(file));
}
} else if ("-output".equals(args[i])) {
FileOutputFormat.setOutputPath(job, new Path(args[++i]));
} else if ("-hdfs".equals(args[i])) {
job.getConfiguration().set("fs.default.name", args[++i]);
} else if ("-jobTracker".equals(args[i])) {
job.getConfiguration().set("mapred.job.tracker", args[++i]);
} else if ("-jobName".equals(args[i])) {
job.getConfiguration().set("mapred.job.name", args[++i]);
}
}
}
}
Mapper 작성하기
Aggregate ETL의 Mapper가 실제로 하는 일은 입력을 그대로 기록하는 것입니다. 이렇게 하면 파일을 여러 개 읽더라도 하둡은 지정한 출력 경로에 Mapper 개수만큼 파일을 분할하여 기록합니다(입력 파일이 커서 Mapper가 400개 동작하였다면 최종 출력 파일도 파일 400개로 분할됩니다). 최종 출력 파일은 하나로 합치게 되면 결국 우리가 원하는 파일 결합의 결과를 얻을 수 있습니다.
package com.jbossug.hadoop.etl.aggregate;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 하나 이상의 입력 파일을 받아서 합치는 Aggregation ETL Mapper.
* 이 Mapper는 입력을 그대로 다시 출력한다.
*
* @author Edward KIM
* @since 1.0
*/
public class AggregateMapper
extends Mapper<LongWritable, Text, NullWritable, Text> {
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(NullWritable.get(), value);
}
}
ETL 실행하기
이제 ETL 모듈을 실행해 보겠습니다. 우선 pom.xml 파일이 있는 소스 코드 프로젝트의 홈 디렉터리에서 mvn package 명령을 내려 .jar 파일로 패키징하고 출력 파일을 삭제합니다. 그런 다음 하둡 작업을 실행하면 됩니다. 입력 파일을 지정하는 -input에 ,로 구분할 수 있는 입력 파일이 한 개 이상임을 주의하기 바랍니다.
#mvn package
#hadoop fs -rmr hdfs://192.168.1.2:9000/output.txt
#hadoop jar target/hadoop-etl-1.0.jar \
com.jbossug.hadoop.etl.aggregate.AggregateDriver \
-input /input1.txt,/input2.txt -output /output.txt \
-hdfs hdfs://192.168.1.2:9000 -jobTracker 192.168.1.2:9001 \
-jobName Aggregate
이제 합쳐진 결과 파일을 타 시스템으로 전달하기 위해 다음과 같이 결과를 하나의 파일로 getmerge하면 됩니다.
#hadoop fs –getmerge hdfs://192.168.1.2:9000/output.txt output.txt
#scp output.txt hadoop@192.168.100.2:/data
마무리
이번 글에서는 Mapper만 이용하여 고성능 ETL을 간단하게 구현해보았습니다. 코드 난이도로 보았을 때 상당히 쉬운 수준이지만 실제로 구현하기 전까지 아이디어를 얻는 시간은 상당합니다. 하둡의 맵리듀스 매커니즘을 이해하는 데 상당한 도움이 되기 때문에 첫 회에서는 몸풀기용 ETL을 설명했습니다. 다음 회에는 좀 더 어려운 ETL 모듈을 구현해겠습니다.
참고 자료
- 아파치 하둡
- 리눅스와 아파치 하둡을 사용한 클라우드 컴퓨팅
- 하둡을 이용한 분산 데이터 처리, Part 1: 시작
- 하둡을 이용한 분산 데이터 처리, Part 2: 추가 주제
- 아파치 하둡의 피그
[오픈 developerWorks]는 여러분이 직접 필자로 참가하는 코너입니다. IBM developerWorks를 통해 공유하고 싶은 지식이 있으신 분들은 원고 기획안을 접수해주세요. 채택되신 분께는 소정의 원고료를 드립니다.
