Hadoop のための Oozie ワークフロー・スケジューラー

入門

ビッグ・データがそのままの形式で、Hadoop を使用してデータ処理タスクを実行する開発者のデータ要件を満たすことは、ほぼありません。実際にデータ処理を行うジョブを開始する前に、さまざまな ETL (Extract/Transform/Load: 抽出/加工/書き込み) や前処理が必要になるのが一般的です。このプロセスを自動化したり、これらの作業を時間が経過した後でも新しいコードやステップを作成せずに再利用できる、繰り返し実行可能な単位およびワークフローへと体系化したりするのに役立つのが、Oozie フレームワークです。この記事では、Oozie を使用してさまざまなタイプのワークフローを作成する方法について説明します。

Sherif Sakr, Senior Research Scientist, National ICT Australia

Sherif Sakr 的照片Sherif Sakr 博士は、オーストラリアのシドニーにある NICTA (National ICT Australia) のソフトウェア・システム・グループに所属する上級研究科学者です。また、ニューサウス・ウェールズ大学 (UNSW) の The School of Computer Science and Engineering (CSE) で共同上級講師も勤めています。彼は 2007年にドイツのコンスタンツ大学でコンピューター・サイエンスの博士号を取得しており、同じくコンピューター・サイエンスの学士号と修士号は、エジプトのカイロ大学で取得しました。2011年には、米国ワシントン州レドモンドにある Microsoft Research の eXtreme Computing Group (XCG) で客員研究科学者として迎えられました。Dr.Sakr は 2012年にアルカテル・ルーセント社ベル研究所の技術研究スタッフとなりました。



2014年 2月 13日

Oozie とは何か?

Apache Oozie は、「ワークフローを作成し、ジョブ間での調整管理を行う」というプロセスを単純化する、Java 技術をベースにしたオープンソース・プロジェクトです。基本的には Oozie を使用することにより、複数のジョブをシーケンシャルに組み合わせて 1 つの論理作業単位にすることができます。Oozie フレームワークを使用するメリットの 1 つは、Oozie が Apache Hadoop スタックに完全に統合されていることから、Apache MapReduce、Pig、Hive、Sqoop のための Hadoop ジョブをサポートしていることです。また Oozie を使用すると、システム固有のジョブ (Java プログラムなど) をスケジューリングすることもできます。つまり、Hadoop 管理者は Oozie を使用することで、さまざまな個々のタスクの処理や、サブワークフローの処理さえも組み合わせることが可能な、複雑なデータ変換を構築することができます。そのため、複雑なジョブを詳細に制御することができ、指定した周期でそれらのジョブを繰り返し簡単に実行することができます。

実際には、Oozie のジョブには以下の 3 つのタイプがあります。

  • Oozie ワークフロー・ジョブ — 無閉路有向グラフとして表現され、実行されるアクションのシーケンスを指定します。
  • Oozie コーディネーター・ジョブ — 指定の時刻になったり、データが用意されたりするとトリガーされる Oozie ワークフロー・ジョブです。
  • Oozie バンドル — 複数のコーディネーター・ジョブとワークフロー・ジョブをパッケージ化し、これらのジョブのライフサイクルを容易に管理できるようにします。

Oozie はどのように動作するのか?

Oozie ワークフローは、無閉路有向グラフ (DAG) に配置されたアクションの集合です。このグラフには、コントロール・ノードとアクション・ノードという 2 つのタイプのノードを含めることができます。「コントロール・ノード」は、ジョブの実行順序を規定するために使用され、ワークフローの開始ルールと終了ルールを規定するとともに、ワークフローに含め得るデシジョン・ポイント (フォーク・ノードや、ジョイン・ノード) によってワークフロー実行パスを制御します。「アクション・ノード」は、タスクの実行をトリガーするために使用されます。特に、アクション・ノードとなり得るものには、MapReduce ジョブ、Pig アプリケーション、ファイルシステム・タスク、Java アプリケーションなどがあります (シェル・アクションと ssh アクションは非推奨となっています)。

Oozie は最初から Hadoop スタックに統合されており、あらゆるタイプの Hadoop ジョブをサポートしています。具体的には Oozie はワークフロー・アクションをトリガーしますが、実際のタスクを実行するのは Hadoop MapReduce です。こうしたことから、Oozie はロード・バランシングやフェイルオーバー等のための既存の Hadoop メカニズムを活用できるようになっています。Oozie はコールバックとポーリングを使用してタスクの完了を検出します。Oozie はタスクを開始すると、そのタスクに対してコールバック用の固有の HTTP URL を提供し、タスクが完了すると、その URL に通知します。タスクがコールバック URL の呼び出しに失敗した場合には、Oozie はタスクの完了をポーリングすることができます。図 1 は Oozie ワークフローの例を示しており、ここでは 6 つのアクション・ノード (Pig スクリプト、3 つの MapReduce ジョブ、Java コード、HDFS タスク) と 5 つのコントロール・ノード (Start、Decision コントロール、Fork、Join、End) を組み合わせています。Oozie ワークフローは、パラメーター化することもできます。その場合、ワークフロー・ジョブをサブミットする際に、パラメーターの値を指定する必要があります。適切なパラメーターを使用すると、同一のワークフロー・ジョブをいくつか同時に実行させることができます。

図 1. Oozie ワークフローの例
Oozie ワークフローの例を示す図

実際には、Oozie ワークフローを一定の周期で、他の条件 (特定のデータが利用可能かどうか、あるいは他のイベントやタスクが完了したかどうかなど) との兼ね合いで実行しなければならない場合があります。そうした場合には、Oozie コーディネーター・ジョブを使用することにより、ユーザーはデータ、時刻、イベントについて記述する述語の形でワークフロー実行トリガーをモデル化することができます。このモデルでは、これらの述語が満たされるとワークフロー・ジョブが開始されます。また Oozie コーディネーターは、実行が開始されたワークフローの結果に依存する複数のワークフローを管理することもできます。実行が開始されたワークフローの出力は、次に実行されるワークフローの入力になります。このチェーンはデータ・アプリケーション・パイプラインと呼ばれます。

Oozie ワークフローを定義する言語は XML ベースであり、Hadoop Process Definition Language (Hadoop プロセス定義言語) と呼ばれます。Oozie にはジョブをサブミットするためのコマンドライン・プログラムが用意されています。このコマンドライン・プログラムにより、REST を使用して Oozie サーバーとやりとりすることができます。Oozie クライアントを使用してジョブをサブミット (つまり実行) するには、クライアントのパラメーターとして HDFS 内の workflow.xml ファイルのフル・パスを Oozie に対して指定します。Oozie にはグローバル・プロパティーの概念はありません。そのため、ジョブを実行するたびにその実行の一環として jobtracker や namenode を始めとするすべてのプロパティーをサブミットする必要があります。Oozie は、状態を格納するために RDBMS を使用します。


Oozie の実際の動作

Oozie ワークフローを使用して反復ジョブを実行してみましょう。Oozie ワークフローは無閉路有向グラフを表す XML ファイルとして記述されます。では、2 つの MapReduce ジョブのチェーンである以下の単純なワークフローの例を見てみましょう。1 番目のジョブは初期のデータ取り込み (ingestor) を実行し、2 番目のジョブは指定されたタイプのデータをマージ (merge) します。

リスト 1. 単純な Oozie ワークフローの例
<workflow-app xmlns='uri:oozie:workflow:0.1' name='SimpleWorkflow'>
	<start to='ingestor'/>
	<action name='ingestor'>
		</java>
			<job-tracker>${jobTracker}</job-tracker>
			<name-node>${nameNode}</name-node>
			<configuration>
				<property>
					<name>mapred.job.queue.name</name>
					<value>default</value>
				</property>
			</configuration>
			<arg>${driveID}</arg>
		</java>
		<ok to='merging'/>
		<error to='fail'/>
	</action>
	<fork name='merging'>
		<path start='mergeT1'/>
		<path start='mergeT2'/>
	</fork>
	<action name='mergeT1'>
		<java>
			<job-tracker>${jobTracker}</job-tracker>
			<name-node>${nameNode}</name-node>
			<configuration>
				<property>
					<name>mapred.job.queue.name</name>
					<value>default</value>
				</property>
			</configuration>
			<arg>-drive</arg>
			<arg>${driveID}</arg>
			<arg>-type</arg>
			<arg>T1</arg>
		</java>
		<ok to='completed'/>
		<error to='fail'/>
	</action>
	<action name='mergeT2'>
		<java>
			<job-tracker>${jobTracker}</job-tracker>
			<name-node>${nameNode}</name-node>
			<configuration>
				<property>
					<name>mapred.job.queue.name</name>
					<value>default</value>
				</property>
			</configuration>
			<main-class>com.navteq.assetmgmt.hdfs.merge.MergerLoader</main-class>
			<arg>-drive</arg>
			<arg>${driveID}</arg>
			<arg>-type</arg>
			<arg>T2</arg>
		</java>
		<ok to='completed'/>
		<error to='fail'/>
	</action>
	<join name='completed' to='end'/>
	<kill name='fail'>
		<message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
	</kill>
	<end name='end'/>
</workflow-app>

この単純なワークフローは、ingestormergeT1mergeT2 という 3 つのアクションを定義しています。各アクションは MapReduce ジョブとして実装されています。図 2 に示すように、このワークフローは Start ノードで開始され、このノードが制御を ingestor アクションに移します。ingestor ステップが完了すると、Fork コントロール・ノードが呼び出され、mergeT1mergeT2 の実行を並行して開始するアクションが実行されます。両方のアクションが完了すると、Join コントロール・ノードが呼び出されます。Join ノードでの処理が正常に完了すると、制御が End ノード に移され、プロセスを終了するステップが実行されます。<job-tracker> エンティティーと <name-node> エンティティーは、Hive ジョブのスクリプトを実行するために Hive ジョブが接続する対象となるサーバーを表しています。

図 2. リスト 1 のワークフローの説明
Oozie ワークフローの例を示す図

今度は、Hive ジョブを含む別の Oozie ワークフローの例を見てみましょう。

リスト 2. Hive ジョブを含む Oozie ワークフロー
<workflow-app xmlns='uri:oozie:workflow:0.1' name='Hive-Workflow'>
	<start to='hive-job'/>
	<action name='hive-job'>
		<hive xmlns='uri:oozie:hive-action:0.4'>
			<job-tracker>${jobTracker}</job-tracker>
			<name-node>${nameNode}</name-node>
			<prepare>
				<delete path='${workflowRoot}/output-data/hive'/>
				<mkdir path='${workflowRoot}/output-data'/>
			</prepare>
			<job-xml>${workflowRoot}/hive-site.xml</job-xml>
			<configuration>
				<property>
					<name>oozie.hive.defaults</name>
					<value>${workflowRoot}/hive-site.xml</value>
				</property>
			</configuration>
			<script>hive_script.q</script>
			<param>JSON_SERDE=${workflowRoot}/lib/hive-serdes-1.0-SNAPSHOT.jar</param>
			<param>PARTITION_PATH=${workflowInput}</param>
			<param>DATEHOUR=${dateHour}</param>
		</hive>
		<ok to='end'/>
		<error to='fail'/>
	</action>
	<kill name='fail'>
		<message>Hive failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
	</kill>
	<end name='end'/>
</workflow-app>

このワークフローでは <hive xmlns='uri:oozie:hive-action:0.4'> というノードにより、このアクションが Hive アクションであることを特定しています。<job-xml> エンティティーは、Hive の構成ファイルを指定するために使用されています。

では最後に、Oozie で反復ワークフローをスケジューリングする別の例を見てみましょう。

リスト 3. Oozie による反復ワークフロー

リスティングを見るにはここをクリック

リスト 3. Oozie による反復ワークフロー

<coordinator-app name='Hive-workglow' frequency='${coord:hours(1)}'
start='${jobStart}' end='${jobEnd}'
timezone='UTC'
xmlns='uri:oozie:coordinator:0.1'>
<datasets>
	<dataset name='InputData' frequency='${coord:hours(1)}'
initial-instance='${initialDataset}' timezone='America/Los_Angeles'>
		<uri-template>
			hdfs://hadoop1:8020/user/flume/InputData/${YEAR}/${MONTH}/${DAY}/${HOUR}
		</uri-template>
		<done-flag></done-flag>
	</dataset>
</datasets>
<input-events>
	<data-in name='InputData ' dataset='Data'>
		<instance>${coord:current(coord:tzOffset() / 60)}</instance>
	</data-in>
	<data-in name='readyIndicator' dataset='tweets'>
		<instance>${coord:current(1 + (coord:tzOffset() / 60))}</instance>
	</data-in>
</input-events>
<action>
	<workflow>
		<app-path>${workflowRoot}/ Hive-Workflow.xml</app-path>
		<configuration>
			<property>
				<name>workflowInput</name>
				<value>${coord:dataIn('InputData')}</value>
			</property>
			<property>
				<name>dateHour</name>
				<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), tzOffset, 'HOUR'), 'yyyyMMddHH')}</value>
			</property>
		</configuration>
	</workflow>
</action>
</coordinator-app>

この例では、前の例の Hive ワークフローを、coord:hours(1) メソッドを使用して 1 時間ごとに実行するように構成しています。ジョブの開始時刻と終了時刻は jobStart コードと jobEndvariables コードを使用して指定しています。datasets エンティティーは入力データセットの場所を指定しています。この場合には InputData というデータセットがあり、このデータセットが 1 時間ごとに更新されることを frequency によって指定しています。Hive ワークフローを実行するごとの入力データセットのインスタンスは、初期インスタンスが dataset によって指定され、以降は入力データセットのインスタンスが別に存在するようになります。YEARMONTHDAYHOUR はデータセットの URI テンプレートをパラメーター化するための変数です。done フラグは、データセットの生成終了を判断するファイルを指定します。


まとめ

Hadoop ユーザーが、ビッグ・データ処理タスクの実行を連結して自動化し、1 つの定義済みワークフローにする作業を支援する機能は、実際のプラクティスでは非常に便利な機能です。この記事では、ワークフローの作成と Hadoop ベースのジョブ間の調整を行うプロセスを単純化する Apache のオープンソース・プロジェクト、Oozie について紹介しました。ただし、こうした目的の実現を支援するプロジェクトは Oozie のみではありません。他にも、Azkaban (LinkedIn が作成してオープンソース化したもの)、Luigi (Python ベースのワークフロー・エンジン)、Cascading (Java、JRuby、Clojure などの JVM ベース言語をサポートするもの) などがあります。皆さんも Oozie とその多様な機能や特徴を活用する準備が整ったら、他の情報やリソースについて記載した「参考文献」を参照してください。

参考文献

学ぶために

製品や技術を入手するために

議論するために

コメント

developerWorks: サイン・イン

必須フィールドは(*)で示されます。


IBM ID が必要ですか?
IBM IDをお忘れですか?


パスワードをお忘れですか?
パスワードの変更

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


お客様が developerWorks に初めてサインインすると、お客様のプロフィールが作成されます。会社名を非表示とする選択を行わない限り、プロフィール内の情報(名前、国/地域や会社名)は公開され、投稿するコンテンツと一緒に表示されますが、いつでもこれらの情報を更新できます。

送信されたすべての情報は安全です。

ディスプレイ・ネームを選択してください



developerWorks に初めてサインインするとプロフィールが作成されますので、その際にディスプレイ・ネームを選択する必要があります。ディスプレイ・ネームは、お客様が developerWorks に投稿するコンテンツと一緒に表示されます。

ディスプレイ・ネームは、3文字から31文字の範囲で指定し、かつ developerWorks コミュニティーでユニークである必要があります。また、プライバシー上の理由でお客様の電子メール・アドレスは使用しないでください。

必須フィールドは(*)で示されます。

3文字から31文字の範囲で指定し

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


送信されたすべての情報は安全です。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=Open source
ArticleID=961382
ArticleTitle=Hadoop のための Oozie ワークフロー・スケジューラー
publish-date=02132014