IBM InfoSphere Information Server バージョン 8.1
本バージョン 8.1は 2006年10月にリリースされた バージョン 8.0 に対して、数多くの新機能の追加や改良が行われており、日本語対応の強化も重要な項目の1つです。日本語対応の強化以外に主に以下のような機能が拡張されています。
- デプロイメントの向上
- ネイティブ 64 bit サポート
- 拡充されたアプリケーション、データベース、ファイル・システムへの接続
- 情報のサービス化機能の強化
InfoSphere Information Server 8.1はシステムに散在する複雑で種類の異なる情報から、より高い価値を引き出すためのソフトウェア・プラットフォームです。データを加工・変換するETL (Extract, Transform, Load)機能を提供する DataStage、データを信頼できる情報とするためのクレンジング機能である QualityStage、データを整理・把握できるようにすることで情報の活用を促進する Information Analyzer や Business Glossary といった機能群から構成されており、InfoSphere Information Severは それら機能が有機的に結合されたスイート製品(図1)です。
図 1
その中でも InfoSphere DataStageは Information Serverのコアとなる製品であり、簡単で使いやすいGUI(図2)上で ステージと呼ばれる部品を組み合わせることにより ETLジョブを簡単に作成・実行できます。DB2やWebSphere MQ, Oracle といったデータソースだけでなく、InfoSphere Federation Server、 InfoSphere Classic,、DataMirrorといった 他のミドルウェア製品とも連携したシステムを構築できます。作成するETLジョブは 実行エンジンの違いにより サーバージョブ と パラレルジョブがあります。
図 2
本稿では、パラレルジョブの実行エンジンであるパラレルエンジンについてこれ以降の章で前編、後編に分けて解説していきます。前編では、パラレルエンジンの概略とサンプル・ジョブについて、後編ではサンプル・ジョブが解釈・実行される際の詳細とチューニング方法の例を解説します。
DataStage Enterprise Editionは DataStageで作成されたパラレルジョブを実行するエンジンです。通称パラレルエンジンと呼ばれています。パラレルエンジンはDataStageのジョブだけでなく、Information Analyzerによる分析情報作成時や Information Service Directorで Deployされる Web Servicesの実行時にも使用されており、まさにInformation Server Runtimeにおける中核コンポーネントと言えます。
パラレルエンジンはその名のとおり、ジョブをパラレルに実行することをサポートしています。複数のCPUに複数の処理ノードを配置し、実行時の並列性を最適化することが可能です。実行時の並列性を最適化することは通常は簡単ではありませんが、パラレルエンジンの場合、ジョブの設計・実装と、ジョブの実行環境の定義が独立しており、ジョブ設計や実装時に何個のノードやCPUで実行されるのかといったことを考慮する必要がありません。そのため効率よくジョブを開発することが可能になります。
システムを構成する場合、並列実行だけでなく、スケーラブルに実行できることも重要です。パラレルエンジンでは、ジョブを1つのかたまりとして実行するのではなく、ジョブの内部構成要素である オペレーター単位でノードに配置されます。パラレルジョブに使用されるステージでは通常1つもしくは複数のオペレーターで構成されています。(注:ステージによってはオペレーターで構成されていないものもあります)
オペレーターという、より細分化された単位での実行管理のため、システムの資源をより有効に利用でき、そのため負荷に応じて追加のCPUやDiskを追加し、ジョブの実行環境の構成を再定義することで簡単にスケールアップすることが可能です。
Pipeline Parallelism と Partition Parallelism
パラレルエンジンの性能を最大限に活用するためには Pipeline Parallelismと Partition Parallelismの両方の動作を理解することが重要です。
図 3はPipeline Parallelismの動作を示したものです。図3のように TRANSFORM、ENRICH、LOADの3つの処理部(ステージもしくはオペレーター)でジョブが構成されている場合、3つの処理部は各々独立したプロセスとして同時に実行されます。入力となるSourceからのデータは プロセスからプロセスをあたかもベルトコンベアのように流れていき、前段の処理部が処理を完了することを待つことなく後段の処理部が処理を開始できます。Pipelineの各段が逐次並行動作するため、各処理部でデータを貯める必要はなく、結果としてディスクの使用量が削減されます。(ただしSortステージのように入力データがすべてそろわないと実行に意味がない処理ではステージ実行中に一時的にデータが貯められます)
図 3
Pipeline ParallelismはCPUの利用効率を高めるために重要ですが、大量なデータを効率よく処理するためにはPartition Parallelismによるスケーラビリティを考慮することが有効な手段です。パーティション化はDatabaseなどでよく使われる手法ですが、複数の処理ノードにおいて各々独立して処理できるよう、処理対象である入力データをサブセットに分けて処理します。そのため複数の処理ノードを複数のCPUに配置し、各々のノードが処理するデータ量がどの時点においても偏りが少ない場合、CPUの数の増加にほぼ比例したスケーラブルなシステムを構成できます。逆の言い方をすれば、各ノードが処理するデータ量に偏りがある場合、システムを効率よく利用できません。そのため、入力データをどのようなルールでサブセット化(パーティション化)するか考慮する必要があります。
Information Serverではデータのパーティション化のルールを指示するものをパーティショナーと呼んでいます。パーティショナーがサポートしているパーティション・タイプとして、自動、DB2、ハッシュ、モジュラス、ラウンドロビン、ランダム、全体、同一、範囲 があります。データのパーティション化とは逆にパーティション化されたデータを1つの集める必要がある場合もあります。たとえば最終的に1つのファイルに書き込む場合ときなどです。これをデータのコレクティングと呼びますが、データのコレクティングのルールを指示するものはコレクターと呼ばれており、ラウンドロビン、順次、ソート・マージ、自動の4つのタイプが用意されています。
DataStage Enterprise Editionでは構成されているノードに応じて Pipeline Parallelism と Partition Parallelismの両方が自動的に適用されます。図4のように、TRANSFORM, ENRICH, LOADの各処理部はPipelineとしてつながっているだけでなく、各々の処理部のパーティション・タイプに応じて、(必要であれば)再度パーティション化され、データがノードを渡り歩くように処理されていきます。
図 4
Pipeline ParallelismとPartition Parallelismの動作の理解のために、実際のジョブの例を見ていきましょう。分散並列処理技術で話題のMapReduceを元にした MapReduceジョブを例とします。
MapReduceでは、Map と Reduceという大きく分けて2つのステップで構成されます。MapReduceの説明ではテキスト中の単語の数を数える Word Countの例がよく使われていますが、ここでも Word Count のための MapReduceジョブとしています。入力となるデータは本来文章そのもののテキストファイルとすべきですが、今回のサンプルでは単語分割のロジックなどを省くため、入力は単語分割され1行に1単語のテキストファイルとしています。他にも正確にはMapReduceを実現していない点もありますが、本稿ではパラレルジョブの動作の説明を中心とするため、ご容赦ください。(MapReduceは大変興味深い技術ですが、本稿では詳しい紹介はいたしません。MapReduceで検索すると技術的詳細を説明した資料がいろいろ見つかりますので、興味のある方はご参照ください。)
Map
Mapでは、入力となる単語に対して 1を値として割り当てます(図5)。入力が単語の配列とした場合、Mapの出力は 単語(key)と値(count)のペアの配列となります。
図 5
Reduce
Reduceでは同じkeyを持つものをまとめます。Word Countの場合には 同じkey (この場合単語)のcountを足し合わせます(図6)。出力形式は入力形式と同じ 単語(key)と値(count)のペアの配列となりますが、countは単語の出てきた回数となっています。
図 6
MapReduceジョブでは、Map部分に Column Generator ステージを、 Reduce部分に Aggregator ステージを使用することとします。図 7はMapReduceジョブ全体を示したものです。
図 7
各ステージの主な設定は以下のとおりです。個々のステージの設定方法などについては 「デザイナー・クライアント・ガイド」 もしくは 「パラレル・ジョブ開発者ガイド」 を参照ください。
InputText (Sequential File ステージ)の設定
InputTextには Sequential File ステージを使用します。InpuTextでSequential File ステージの省略値からの追加・変更は以下のものがあります。
- 「出力」ページ/「列」タブ - 列として "word"を追加。SQLタイプ VarChar、長さ 32、NULL可能 いいえ。
- 「出力」ページ/「プロパティ」タブ - プロパティ「ソース」に入力するファイル名を指定。(例: C:\JobSample\inputtext.txt)
Map (Column Generator ステージ)の設定
Mapには Column Generator ステージを使用します。MapでColumn Generator ステージの省略値からの追加・変更は以下のものがあります。
- 「入力」ページ/「列」タブ - 列として "word"を追加。SQLタイプ VarChar、長さ 32、NULL可能 いいえ。
- 「入力」ページ/「列」タブ - 列として "word"、"count"を追加。"word"の設定は SQLタイプ VarChar、長さ 32、NULL可能 いいえ。"count"の設定はSQL タイプ Integer、NULL可能 いいえ。
- 「ステージ」ページ/「プロパティ」タブ - プロパティ「オプション」の「生成する列」に"count"を指定。
- 「出力」ページ/「マッピング」タブ - 図8のように、マッピングします。
- 「列メタデータ編集」/「パラレル」タブ - 「出力」ページ/「列」タブで、列名"count"の左の"2"の部分(図9 赤丸部分) をダブルクリックし「列メタデータ編集」パネルを表示します。プロパティ「ジェネレータ」の「タイプ」をサイクルに、「初期値」を1、「増加」を0に指定します。
図 8
図 9
Reduce (Aggregator ステージ)の設定
Reduceには Aggregator ステージを使用します。ReduceでAggregator ステージの省略値からの追加・変更は以下のものがあります。
- 「入力」ページ/「列」タブ - 列として "word"、"count"を追加。"word"の設定は SQLタイプ VarChar、長さ 32、NULL可能 いいえ。"count"の設定はSQL タイプ Integer、NULL可能 いいえ。
- 「出力」ページ/「列」タブ - 列として "word"、"count"を追加。"word"の設定は SQLタイプ VarChar、長さ 32、NULL可能 いいえ。"count"の設定はSQL タイプ Integer、NULL可能 いいえ。
- 「ステージ」ページ/「プロパティ」タブ - 図10を参考に設定してください。
- 「出力」ページ/「マッピング」タブ - 図11のように、マッピングします。
図 10
図 11
CountResult (Sequential File ステージ)の設定
CountResultには Sequential File ステージを使用します。CountResultでSequential File ステージの省略値からの追加・変更は以下のものがあります。
- 「入力」ページ/「列」タブ - 列として "word"、"count"を追加。"word"の設定は SQLタイプ VarChar、長さ 32、NULL可能 いいえ。"count"の設定はSQL タイプ Integer、NULL可能 いいえ。
- 「入力」ページ/「プロパティ」タブ - プロパティ「ソース」に出力するファイル名を指定。
- 「入力」ページ/「フォーマット」タブ - プロパティ「フィールド・デフォルト」の「引用符」に"なし"を指定。
MapReduceジョブは入力ファイルを読み込み、出力ファイルに結果を書き込むジョブです。実行するためには設定したジョブを保存し、コンパイルする必要があります。コンパイル時にエラーがでなければ実行可能です。入力ファイルとして図12の内容のファイルを用意し、InputTextで指定したファイル名 (例: C:\JobSample\inputtext.txt)で保存しておきます。
図 12
ジョブを実行すると出力ファイルには図13のような結果が得られます。
図 13
各単語ごとに表れた回数が計算されており、単純な仕組みですが、MapReduceとして機能していることが確認できます。
Windows版Information Serverでは、CPUを1つだけ持つServerにインストールした場合、通常の構成はもっともシンプルな1ノード構成となります。1ノード構成でもPipeline Parallelismは自動的に適用されますが、Partition Parallelismを機能させるためには複数ノードの構成が必要です。処理ノードの構成は構成ファイルを編集することにより変更できます。構成ファイルを編集する画面は、DataStageデザイナーの「ツール」メニューから「構成」メニューを選択することにより表示されます。標準の構成を変更するには、"default"を選択してください。
それでは、1ノードの構成を2ノードに変更してみましょう。 nodeの定義が書かれている部分はエディタになっていますので直接編集可能です。標準のインストール時には "node1"しかありませんが、図14のように "node2"を追加します。編集が終わると、「保存」ボタンを押し、「構成を保存」を選択することで、保存できます。
図 14
2ノードに変更した後、同じMapReduceジョブを実行してみましょう。出力されるファイルは、行の順番は変わっているかもしれませんが各単語に対する結果は同じになっていることが確認できます。今回の入力ファイルのように単語数が少ない場合、ノード数を増やしても実行時間などの効果はありませんが、入力データの数が膨大な場合、複数ノードでのPartition Parallelismの効果が顕著に確認できます。
- [1] デザイナー・クライアント・ガイド
- [2] パラレル・ジョブ開発者ガイド