クラウド関連の Big data の問題を MapReduce で解決する

MapReduce とクラウド・コンピューティングの組み合わせが大量のデータを処理する上でいかに理想的であるかを探る

複雑な計算を駆使した結果を出すには、より多くの物理リソースと仮想リソースにアクセスできなければなりません。しかし、組織内にグリッド・システムを構築するには、リソース、ロジスティクス、そして技術が障壁となる場合があります。しかもその一部が政治的な問題である場合もあります。そのような場合に有効なのが、クラウド・コンピューティングです。また、クラウド・コンピューティングを MapReduce 機能と組み合わせると、例えば 2 つの数値をどこで加算するかといったことがトランスペアレントになり、重要でなくなるため、大量の Big data の計算を行う上で最適です。

Noah Gift, Associate Director Engineer, AT&T Interactive

author Noah GiftNoah Gift は、AT&T Interactive で経験豊富な技術リーダーを務めるソフトウェア開発者です。彼は興味深い問題を解決する手段として、Python/Iron Python、Erlang、F#、C#、JavaScript などのさまざまな言語を使用しています (Caltech、Disney Feature Animation、Sony Imageworks、および Weta Digital での勤務経験があります)。Python Software Foundation のメンバーである彼は、developerWorks に多数の記事を書いており、『Python For Unix and Linux System Administration』の共著者でもあります。カリフォルニア州立工科大学サンルイス・オビスポ校で栄養科学の学位を取得し、カリフォルニア州立大学ロサンゼルス校でコンピューター情報システムの修士号を取得しました。ビジネス分析、金融、起業家精神を専攻しているカリフォルニア州立大学デービス校では MBA 候補となっています。余暇は、ピアノ曲の作曲とマラソンで過ごしています。彼の Web サイトにアクセスしてください。また、Twitter でフォローすることも、Web で彼の書いた記事を読むこともできます。



2010年 11月 08日

MapReduce 関連の用語

mapper: 作業単位を実行する関数。2 つの数値を加算するだけの単純な関数である場合もあります。この関数が返すのは、IP アドレスや単語のようなキーと、カウントのような値です。

reducer: 一連の要素のすべてを結合する関数です。

分散ファイルシステム: データを処理するすべてのマシンがアクセスできる共有ファイルシステムです。

ミクロ経済学が私たちに教えているのは、あるシステムにおいて実行しなければならないアクティビティーの大半に、そのシステムに属している大多数の存在が関わっているシステムよりも、何らかのものに特化したシステムのほうが生産性に優れているということです。別の言葉に置き換えると、ある特定の仕事を専門とする人に比べ、何でもできる器用な人は 1 つひとつの仕事における生産性では劣ります。これは、比較優位という言葉で知られています。つまり、個人がある特定のサービスの生産において、他のサービスの生産よりも優れている場合、その個人はそのサービスの生産において優位であるということです。そして、特化は特定のスキルの獲得を促進することにもなります。(この比較優位に関する説得力のある説明としては、Peace Corps のボランティアがネパール滞在中に、ヤギの食肉処理から目覚まし時計の修理に至るまで、ありとあらゆる仕事をこなすことができる多芸な料理人 Birkhaman を雇ったときの逸話が『Principles of Microeconomics』(Robert Frank、Ben Bernanke 共著) に載っています。ネパールでは、能力が低い労働者であっても組み合わせて雇うことで、幅広い仕事をこなしているのです。)

クラウド・コンピューティングは、比較優位の原則を文字通り実践している例です。この記事では、元々は処理を並列化することの複雑さを抽象化するために設計された MapReduce が (大量のデータが関わる問題に対処する場合には、なおのこと)、いかにクラウド・コンピューティングにとって理想的なプログラミング・パラダイムであるかを探ります。

クラウド・コンピューティングは、MapReduce の抽象化をベースにすることで完全なものを構築することができます。この MapReduce の抽象化は、2 つの数値をどこで加算するかといったことをトランスペアレントかつ重要でないものにすることで実現されます。その例に話題を移す前に、まずは MapReduce が成功した理由について説明します。

クラウドで MapReduce を使用する理由

MapReduce プログラミング・モードは Google で開発されました。MapReduce が機能する仕組みは、Google のエンジニアたちによって発行された記事「MapReduce: Simplified Data Processing on Large Clusters」にわかりやすく説明されています。そしてこの記事がきっかけとなって、2004年から今日に至るまで、多くの MapReduce のオープンソース実装が生まれることになりました。

MapReduce システムが成功した理由の 1 つは、高度の並列性が必要なコードを作成するための単純なパラダイムとなるように設計されているためです。この設計は、Lisp やその他の関数型言語が持つ関数型プログラミングの側面に着想を得たものです。

ここで本題に移り、MapReduce とクラウド・コンピューティングがなぜ抜群の相性かと言うと、MapReduce の主なセールス・ポイントは、処理を並列化するための動作 (つまり並列プログラミングが機能する仕組み) を開発者に対して抽象化できることです。

数千台ものマシンが転がっている企業で働いているとしたら言うことありませんが、そのような例はめったにありません。組織に余力があるとしても、その組織の中にグリッドを構築するとなると、技術的な障害や、政治的な障害、さらにはロジスティクスに関する障害が多数存在するのが通常です。

分散コンピューティングにおける Erlang

クラウド・コンピューティングは、Erlang をはじめとするさまざまなものに光を当ててきました。Erlang は、開発者がオペレーティング・システムを記述する場合に使用する特性の多くを共有するという点で、他には類を見ないプログラミング言語です。また、そうした特殊な性質により、大規模な分散システムを構築するには理想的な言語となっています。したがって、多くの分散アルゴリズムの「クラウド」実装が Erlang で作成されているのは当然のことと言えるでしょう。CouchDB や Disco もその例です。Erlang は、クラウドという言葉が作り出される前から、いわゆるクラウド・システムの構築に使用されてきました。

そこで急遽、当たり前の発想というだけでなく、従わざるを得ない選択肢となるのがクラウド・コンピューティングです。

クラウドでは、任意の数のマシンをプロビジョニングして MapReduce ジョブを実行するスクリプトを作成することができます。しかもクラウドで発生する支払いは、各システムを使用した時間の分だけです。10 分間しか使用しないとしても、あるいは使用時間が 10 ヶ月に及ぶとしても、使用した分だけ払うことに変わりはありません。

このパラダイムの好例は、Yelp (Real people. Real reviews® という、国内のビジネスを評価するサイト) にありました。この企業の最近のエンジニアリング・ブログに、「People Who Viewed This Also Viewed... (このサイトを見た人は、このサイトも見ています…)」という機能を実現するために、MapReduce をどのように使用したかを説明する記事が載っています。Yelp では毎日 100GB のログ・データを生成することから、このサイトが抱える問題は、典型的な Big data の問題と言えます。

当初、Yelp のエンジニアは Hadoop クラスターを独自に構築していましたが、最終的には Amazon の Elastic MapReduce サービスをベースに動作する、mrjob という独自の MapReduce フレームワークを作成する結果となりました。Yelp で検索およびデータ・マイニングを担当するエンジニアの Dave M は、次のように述べています。

「私たちが、『People Who Viewed this Also Viewed...』サイトの機能をどのように実現しているかと言うと、ご想像のとおり MapReduce を使用しています。MapReduce は 1 つのジョブを小さく分割するには最も単純な方法です。MapReduce では基本的に、mapper が入力行を読み取って (key, value) のタプルに分割し、各キーとそれに対応する値のすべてを reducer に送信します。これが、私たち独自の mrjob という Python フレームワークで作成した、単語の出現数をカウントする単純な MapReduce ジョブです」。

Dave M はさらに、こう続けています。

「以前は他の多くの会社と同じように Hadoop クラスターを実行するという方法を取っていて、コードを Web サーバーにプッシュするときには常にコードを Hadoop マシンにプッシュしていました。

このやり方は、ジョブがコード・ベースの他のあらゆるコードを参照できるという点では、まあまあ良かったと言えます。

けれどもジョブを本番環境に移すまでは、そのジョブが機能するかどうか確実にわからないという点は、問題でした。さらに最悪なことに、ほとんどの時間、クラスターはアイドル状態で、たまにかなり大きなジョブがあるとノードのすべてを占領してしまうため、他のすべてのジョブが待機しなければなりませんでした」。

Amazon クラウドで動作する MapReduce によって、Yelp はその独自の Hadoop クラスターの使用をやめることができました。そして Yelp の mrjob フレームワークは 1 年の間に安定し、今では GitHub で共有されるようになっています。

クラウド・コンピューティングと MapReduce は、Big data ジョブに合った組み合わせのように思えます。ここからは、大量のログ・データを処理する方法を説明します。


実際のログ・ファイルの処理

多くの人々が実際に直面している問題は、大量のログ・データをいかに処理するかという問題です。リスト 1 のコード (ダウンロードでも入手できます) は、6.3GB の IIS (Internet Information Services) ログ・ファイルを Python のマルチプロセッシング・モジュールだけを使って要約した例です。このコードを MacBook Pro ラップトップで実行したところ、約 2 分間で最初の 25 個の IP アドレスが生成されました。

リスト 1. Python のマルチプロセッシング・モジュールを使用して 6.3GB のログ・ファイルを要約する例
Code Listing:  iis_map_reduce_ipsum.py
"N-Core Map Reduce Log Parser/Summation"

from collections import defaultdict
from operator import itemgetter
from glob import glob
from multiprocessing import Pool, current_process
from itertools import chain

def ip_start_mapper(logfile):
    log = open(logfile)
    for line in log:
        yield line.split()

def ip_cut(lines):
    for line in lines:
        try:
            ip = line[8]
        except IndexError:
            continue
        yield ip, 1

def mapper(logfile):
    print "Processing Log File: %s-%s" % (current_process().name, logfile)
    lines = ip_start_mapper(logfile)
    cut_lines = ip_cut(lines)
    return ip_partition(cut_lines)

def ip_partition(lines):
    partitioned_data = defaultdict(list)
    for ip, count in lines:
        partitioned_data[ip].append(count)
    return partitioned_data.items()        

def reducer(ip_key_val):
    ip, count = ip_key_val
    return (ip, sum(sum(count,[])))

def start_mr(mapper_func, reducer_func, files, processes=8, chunksize=1):
    pool = Pool(processes)
    map_output = pool.map(mapper_func, files, chunksize)
    partitioned_data = ip_partition(chain(*map_output))
    reduced_output = pool.map(reducer_func, partitioned_data)
    return reduced_output

def print_report(sort_list, num=25):
    for items in sort_list[0:num]:
        print "%s, %s" % (items[0], items[1])
def run():
    files = glob("*.log")
    ip_stats = start_mr(mapper, reducer, files)
    sorted_ip_stats = sorted(ip_stats, key=itemgetter(1), reverse=True)
    print_report(sorted_ip_stats)
    
if __name__ == "__main__":
    run()

図 1 に、この動作を図解します。

図 1. IIS ログ・ファイルを MapReduce で処理する図
IIS ログ・ファイルを MapReduce で処理する図

以下に、順を追ってコードの内容を説明します。ご覧のように、このコードは非常に小さく、約 50 行しかありません。

  • mapper 関数が効率的に各行から IP アドレスを抽出して、その IP アドレスを値 1 と一緒に返します。これは (key, value) の抽出段階であり、生成されるプロセスのそれぞれで行われます。処理結果は、集約フェーズで使えるように、イテレーターによる処理の可能な要素が連なるオブジェクト (詳しくは、chain(*iterables) およびその他の Python の itertools を参照) にまとめられます。これは、「データ・パーティショニング」と呼ばれます。
  • MapReduce ライフサイクルの次の段階では、中間結果のすべてがまとめて要約されます。この例では、それを行うのは reduce 関数であり、この関数の中には集約フェーズのすべてが含まれています。
  • 最後に巨大なリストのなかから先頭の 25 個の結果が出力されます。

ここでは MapReduce を説明するのに簡単な手段としてマルチプロセッシング・モジュールを使用しましたが、このコードを少し変更すれば、他の MapReduce クラウドでも実行することができます。リスト 2 に、このジョブの完全な出力を記載します。

リスト 2. リスト 1 を実行して生成された完全な出力
lion% time python iisparse.py
Processing Log File: PoolWorker-1-ex100812.log
Processing Log File: PoolWorker-2-ex100813.log
Processing Log File: PoolWorker-3-ex100814.log
Processing Log File: PoolWorker-4-ex100815.log
Processing Log File: PoolWorker-5-ex100816.log
Processing Log File: PoolWorker-6-ex100817.log
Processing Log File: PoolWorker-7-ex100818.log
Processing Log File: PoolWorker-8-ex100819.log
Processing Log File: PoolWorker-7-ex100820.log
Processing Log File: PoolWorker-3-ex100821.log
Processing Log File: PoolWorker-8-ex100822.log
Processing Log File: PoolWorker-4-ex100823.log
Processing Log File: PoolWorker-6-ex100824.log
Processing Log File: PoolWorker-1-ex100825.log
Processing Log File: PoolWorker-2-ex100826.log
10.0.1.1, 24047
10.0.1.2, 22667
10.0.1.4, 20234
10.0.1.5, 18180
[...output supressed for space, and IP addresses changed for privacy]
python iisparse.py  57.40s user 7.48s system 54% cpu 1:59.47 total

まとめ

次のステップ

次のステップとなるのはもちろん、この記事の「参考文献」セクションを参照することです。「自然言語処理」のセクションや「この記事で取り上げたトピックについて詳しく学ぶ」のセクションに重点を絞るのもお勧めです。

さらに、Yelp の mrjob と The IBM Distribution of Apache Hadoop をダウンロードして試してみてください。

厳密に言えば、クラウド・コンピューティングと言うと、さまざまなものを意味する可能性があります。単純にデータ・センター内の仮想マシンでシーケンシャルにスクリプトを実行することを表す場合もあります。この記事では、MapReduce とクラウド・コンピューティングをそれぞれに支える理論の一部を用いて、大量のデータを要約するという実際の問題を解決しました。

クラウド・ベースの MapReduce の選択肢は、オープンソースのオファリンングにしても、商用のオファリングにしても豊富に用意されています。この記事で学んだ知識はペタバイドのログ・ファイルにも簡単に適用することができます。このことが、クラウド環境には、なおのこと MapReduce による抽象化が有用なツールとなる理由の核心です。


ダウンロード

内容ファイル名サイズ
Sample Python script for this articleMapReducePythonScript.zip1KB

参考文献

学ぶために

製品や技術を入手するために

  • Yelp の mrjob フレームワークは GitHub から入手できます。

議論するために

コメント

developerWorks: サイン・イン

必須フィールドは(*)で示されます。


IBM ID が必要ですか?
IBM IDをお忘れですか?


パスワードをお忘れですか?
パスワードの変更

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


お客様が developerWorks に初めてサインインすると、お客様のプロフィールが作成されます。会社名を非表示とする選択を行わない限り、プロフィール内の情報(名前、国/地域や会社名)は公開され、投稿するコンテンツと一緒に表示されますが、いつでもこれらの情報を更新できます。

送信されたすべての情報は安全です。

ディスプレイ・ネームを選択してください



developerWorks に初めてサインインするとプロフィールが作成されますので、その際にディスプレイ・ネームを選択する必要があります。ディスプレイ・ネームは、お客様が developerWorks に投稿するコンテンツと一緒に表示されます。

ディスプレイ・ネームは、3文字から31文字の範囲で指定し、かつ developerWorks コミュニティーでユニークである必要があります。また、プライバシー上の理由でお客様の電子メール・アドレスは使用しないでください。

必須フィールドは(*)で示されます。

3文字から31文字の範囲で指定し

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


送信されたすべての情報は安全です。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=Cloud computing
ArticleID=594569
ArticleTitle=クラウド関連の Big data の問題を MapReduce で解決する
publish-date=11082010