Twitter Storm でビッグ・データをリアルタイムに処理する

ビッグ・データのストリーム処理入門

Storm は、オープンソースのビッグ・データ処理システムです。Storm はリアルタイム分散処理を対象としている点、そして言語に依存しないという点で、他のビッグ・データ処理システムとは異なります。Twitter の Storm とそのアーキテクチャー、そしてさまざまなバッチ処理ソリューションおよびストリーム・プロセッシング・ソリューションについて学んでください。

2013年 3月 13日 ― IBM InfoSphere Streams についての情報を、囲み記事「IBM のストリーム・コンピューティング」として追加するとともに、「参考文献」にも追加しました。

M. Tim Jones, Independent author, Consultant

M.Tim JonesM. Tim Jones は組み込みソフトウェアのエンジニアであり、『Artificial Intelligence: A Systems Approach』、『GNU/Linux Application Programming』(現在、第 2 版です) や『AI Application Programming』(こちらも現在、第 2 版です)、それに『BSD Sockets Programming from a Multilanguage Perspective』などの著者でもあります。技術的な経歴は静止軌道衛星用のカーネル開発から、組み込みシステム・アーキテクチャーやネットワーク・プロトコル開発まで、広範にわたっています。彼はコロラド州ロングモン在住で、Intel に勤務するプラットフォーム・アーキテクトであり、執筆活動も行っています。



2014年 3月 13日 (初版 2012年 12月 06日)

明らかにもっともよく知られたビッグ・データ分析ソリューションである Hadoop では、バッチ処理にフォーカスして処理を行います。この処理モデルは、多くのケース (Web の索引付けなど) に十分対処できますが、極めて動的なソースからリアルタイムの情報が必要となるユース・モデルもあります。このビッグ・データをリアルタイムで処理するという問題を解決するために導入されたのが、Nathan Marz 氏 (BackType 社を経て、現在は Twitter 社に勤務) が開発した Storm です。Storm は静的データを処理するだけでなく、絶え間なく発生することが見込まれるストリーミング・データも処理することができます。Twitter のユーザーたちが毎日 1.4 億件ものツイートを生成していることを考えると、この技術がいかに有用であるかは簡単に理解することができます。

IBM のストリーム・コンピューティング

IBM のビッグ・データ・プラットフォームの一部である InfoSphere Streams は、最先端のビッグ・データ処理を提供します。このハイパフォーマンス・コンピューティング・プラットフォームを使用すると、ユーザーは何千ものリアルタイム・ソースから送られてくる情報を迅速に取り込み、分析し、相互に関連付けるアプリケーションを開発または再利用できるようになります。InfoSphere Streams について詳しく学んでください。

その一方、Storm が持つ機能は単なる従来のビッグ・データ分析システムの機能にとどまらず、CEP (Complex Event-Processing: 複合イベント処理) システムの一例でもあります。CEP システムは、一般に計算指向または検出指向のいずれかに分類されますが、Storm にはそのそれぞれを、ユーザー定義のアルゴリズムによって実装することができます。例えば、CEP を使用すれば、大量のイベントから有意義なイベントを特定し、それらのイベントでリアルタイムにアクションを実行することができます。

Nathan Marz 氏によって Twitter 内でのさまざまな Storm の使用例が提供されていますが、なかでもとりわけ興味深い例は、トレンド情報の生成です。Twitter では、ツイートの Firehose から新しいトレンドを抽出し、それらのトレンドをローカルおよび国レベルで保持します。これが何を意味するのかと言うと、新しい話題が出現してくると、Twitter のトレンディング・トピック・アルゴリズムがその話題をリアルタイムで識別するということです。このリアルタイム・アルゴリズムは、Storm の内部に Twitter データの継続的分析プロセスとして実装されています。

「ビッグ・データ」の意味

「ビッグ・データ」とは、従来の手段では管理しきれない規模のデータを意味します。インターネット規模のデータが登場したことにより、この新たな種類のデータの処理に対応できる新しいアーキテクチャーとアプリケーションが次々と作成されています。これらのアーキテクチャーは極めてスケーラブルであり、多数のサーバーで効率的にデータを並行処理します。

Storm と従来のビッグ・データ・ソリューションとの違い

Storm を他のビッグ・データ・ソリューションから差別化しているのは、扱っているパラダイムの違いです。Hadoop は基本的にバッチ処理システムであり、データは HDFS (Hadoop File System) に取り込まれ、ノードに分散されて処理されます。処理が完了すると、結果のデータが HDFS に返されて依頼元で使用できるようになるという仕組みです。一方、Storm では、終わることのないデータ・ストリームを変換処理するトポロジーを構成できるようになっています。データが到着すると処理を継続する Hadoop のジョブとは異なり、Storm のトポロジーでは、永遠にデータ変換処理を続けます。


ビッグ・データの実装

Hadoop のコアは Java 言語で作られていますが、Hadoop のコアがサポートするデータ分析アプリケーションはさまざまな言語で作られています。最近のデータ分析アプリケーションは、新しい言語とその機能を利用するために、より難解な実装になっています。例えば、カリフォルニア大学 (UC) バークレー校の Spark は Scala 言語で実装されている一方、Twitter の Storm は Clojure (「クロージャー」と発音) で実装されています。

Clojure は Lisp 言語の最近の方言です。Clojure は Lisp と同じく関数型スタイルのプログラミングをサポートしていますが、Clojure にはマルチスレッド・プログラミング (Storm の構造に有益な機能) を簡易化する機能も組み込まれています。Clojure は仮想マシン (VM) ベースの言語であり、Java 仮想マシン上で動作します。Storm は Clojure で開発されているものの、Storm 内のアプリケーションは任意の言語で作成することができます。アダプターさえあれば、Storm のアーキテクチャーに接続することができます。アダプターには Scala、JRuby、Perl、および PHP 用が用意されていて、Storm トポロジーへのストリーミングをサポートする SQL (Structured Query Language) アダプターもあります。


Storm の主要な特性

Storm には、パフォーマンスと信頼性の点で Storm を特徴付ける一連の特性が実装されています。Storm はメッセージ・パッシングに ZeroMQ を使用して、メッセージを中間キューに入れることなく、タスク自体の間で直接メッセージを受け渡せるようにしています。メッセージングの内部では、自動化された効率的なメカニズムにより、メッセージから Storm のプリミティブ型へのシリアライズ、Storm のプリミティブ型からメッセージへのデシリアライズが行われます。

Storm をとりわけ興味深いものにしているのは、耐障害性と障害の管理に重点が置かれていることです。Storm は、各タプルがこのトポロジーを通して完全に処理されるように保証されたメッセージ処理を実装します。タプルが処理されていないことが検出されると、スパウトから自動的にタプルが再生されます。Storm はタスク・レベルでの障害検出機能も実装します。タスクの障害が発生した場合、処理を素早く再開するために、自動的にメッセージの再割り当てが行われます。Storm には Hadoop よりインテリジェントなプロセス管理機能が含まれており、リソースが適切に使用されるように、プロセスはスーパーバイザーによって管理されます。


Storm モデル

Storm が実装するデータ・フロー・モデルでは、データが変換処理エンティティーのネットワークを絶え間なく流れます (図 1 を参照)。データ・フローは、抽象化して「ストリーム (Stream)」と呼ばれます。ストリームは、限りないタプルの連続です。タプルは、標準データ型 (整数型、浮動小数点型、バイト配列型など) を表したり、シリアライズ・コードを追加したユーザー定義型を表したりすることができる構造体のようなものです。各ストリームは一意の ID で定義されます。データのソースとシンクからなるトポロジーは、このストリームの一意の ID を使用して作成することができます。ストリームのソースは「スパウト (Spout)」です。スパウトが、外部ソースのデータを Storm トポロジーに流し込みます。

図 1. 単純な Storm トポロジーの概念アーキテクチャー
単純な Storm トポロジーの概念アーキテクチャーを示す図

シンク (変換処理を行うエンティティー) は、「ボルト (Bolt)」と呼ばれます。ストリームに対して単一の変換処理を行うボルトによって、Storm トポロジー内でのすべての処理が行われます。ボルトは MapReduce のような従来の機能を実装することも、フィルタリング、集約、あるいは外部エンティティー (例えばデータベース) との通信など、複雑なアクション (シングル・ステップの機能) を実装することもできます。標準的な Storm トポロジーは複数の変換処理を実装するため、個々のタプル・ストリームを処理する複数のボルトが必要になります。スパウトとボルトは、いずれも Linux システム内に 1 つ以上のタスクとして実装されます。

Storm を使用すれば、単語の出現頻度を求めるための MapReduce 機能を簡単に実装することができます。図 2 に示されているように、スパウトがテキスト・データのストリームを生成し、ボルトが (ストリームに含まれる単語をトークン化するための) Map 関数を実装します。「map」ボルトで処理されたストリームは、(単語を集約してカウントするための) Reduce 関数を実装する 1 つのボルトに流れ込みます。

図 2. MapReduce 関数の単純な Storm トポロジー
MapReduce 関数の単純な Storm トポロジーを示す図

ボルトは、複数のボルトにデータを流すことも、複数のソースからデータを受け入れることもできます。Storm にはストリーム・グループ化という概念があり、この概念はシャッフル・グループ化 (ランダムに、ただしそれぞれのボルトに同じ数のタプルを分配) やフィールド・グループ化 (ストリームのフィールドに基づいてストリームを分割) を実装します。ストリーム・グループ化は他にもあり、例えばタプルの生成側が独自の内部ロジックを使用してタプルをルーティングすることもできます。

ただし、Storm アーキテクチャーの最も興味深い機能の 1 つになっているのは、「保証されたメッセージ処理」の概念です。Storm は、スパウトが送信するすべてのタプルが処理されることを保証します。一定のタイムアウト期間内でタプルが処理されなければ、Storm はタプルをスパウトから再生します。この機能には、トポロジーを通過するタプルを追跡するための賢いトリックが必要です。これこそが、Storm の重要な付加価値の 1 つとなっています。

信頼できるメッセージングのサポートに加え、Storm は ZeroMQ を使用してメッセージング・パフォーマンスを最大限に高めます (中間のキューイングを省き、タスク間で直接メッセージ・パッシングを行えるようにしています)。ZeroMQ は、内部に組み込まれた輻輳検出機能によってその通信方法を変化させて、使用可能な帯域幅を最適化します。


Storm の使用例

ここで、単純な MapReduce トポロジーを実装するコード (リスト 1 を参照) を通じて、Storm の例を見ていきましょう。この例では、GitHub から入手できる、Nathan 氏が作成した Storm スターター・キットに含まれている見事な単語数カウント・コードを使用します (「参考文献」のリンクを参照)。この例で説明するのは、図 2 に示したトポロジーです。このトポロジーは、1 つのボルトで構成された map 変換処理と、1 つのボルトで構成された reduce 変換処理を実装します。

リスト 1. 図 2 の Storm トポロジーを作成する
01  TopologyBuilder builder = new TopologyBuilder();
02          
03  builder.setSpout("spout", new RandomSentenceSpout(), 5);
04          
05  builder.setBolt("map", new SplitSentence(), 4)
06           .shuffleGrouping("spout");
07  
08  builder.setBolt("reduce", new WordCount(), 8)
09           .fieldsGrouping("map", new Fields("word"));
10  
11  Config conf = new Config();
12  conf.setDebug(true);
13  
14  LocalCluster cluster = new LocalCluster();
15  cluster.submitTopology("word-count", conf, builder.createTopology());
16  
17  Thread.sleep(10000);
18  
19  cluster.shutdown();

リスト 1 (参照用に、行番号を追加してあります) ではまず、TopologyBuilder を使用して新規トポロジーを宣言しています。次に 3 行目で、RandomSentenceSpout で構成された (spout という名前の) スパウトを定義しています。RandomSentenceSpout クラス (すなわち、nextTuple メソッド) は、5 つのランダムなセンテンスのうちの 1 つをそのデータとして出力します。setSpout メソッドの最後にある引数 5 は、並列処理であることを示唆する値 (つまり、このアクティビティー用に作成するタスクの数) です。

5 行目と 6 行目では、最初のボルト (アルゴリズムによる変換処理エンティティー) を定義します。この例で定義するのは、map (分割) ボルトです。このボルトは、入力ストリームをトークン化して個々の単語を出力するために、SplitSentence を使用します。6 行目で shuffleGrouping を使用してこのボルトの入力サブスクリプションを定義していること、そしてストリーム・グループ化がシャッフルとして定義されていることに注意してください。このシャッフル・グループ化は、スパウトからの入力が「シャッフルされる」ことを意味します。つまり、入力はこのボルトに含まれる複数のタスクにランダムに分配されます (4 つのタスクによる並列処理であることが示唆されています)。

8 行目と 9 行目では、実質的に reduce 要素の役割を果たす最後のボルトを定義し、その入力として map ボルトを指定します。WordCount メソッドは必要とされる単語数カウント動作 (カウント全体を保持するために同様の単語をグループ化する動作) を実装しますが、このグループ化はシャッフルではないため、出力には一貫性があります。reduce 動作を実装するタスクが複数あるとしたら、カウントはすべての合計ではなく、分割されたカウントとなります。

11 行目と 12 行目で、構成オブジェクトを定義してデバッグ・モードを有効にします。Config クラスに含まれる可能性のある構成は多数に上ります (Storm クラス・ツリーについての詳細は、「参考文献」に記載されているリンクを参照してください)。

14 行目と 15 行目で作成しているのはローカル・クラスターです (この例では、ローカル・モードの使用を定義しています)。このローカル・クラスターを作成するために、クラスターの名前、構成オブジェクト、およびトポロジー (builder クラスの createTopology 要素によって取得) を定義します。

最後に 17 行目で、Storm を一定の期間スリープ状態にして、19 行目でクラスターをシャットダウンします。Storm は永遠に動作するシステムであることを思い出してください。したがって、タスクは長期間にわたって存続し、サブスクライブ対象のストリームで新しいタプルを処理し続ける可能性があります。

スパウトとボルトの詳細を含め、この驚くほど単純な実装についての詳細は、Storm スターター・キットで学ぶことができます。


Storm の使用

Nathan Marz 氏により、クラスター・モードとローカル・モードのそれぞれで運用する場合の Storm のインストール方法を詳しく説明している一連の理解しやすいドキュメントが用意されています。ローカル・モードでは、多数のノードからなる大規模なクラスターを必要とせずに Storm を使用することができます。Storm をクラスターで使用する必要がある一方、ノードが足りない場合には、Storm クラスターを Amazon EC2 (Elastic Compute Cloud) 内に実装することもできます。それぞれの Strom モード (ローカル、クラスター、Amazon EC2) については、「参考文献」を参照してください。


オープンソースのその他のビッグ・データ・ソリューション

Google が 2004年に MapReduce パラダイムを導入して以来、このオリジナルの MapReduce パラダイムを使用した (またはその品質を持つ) いくつかのソリューションが登場しています。Google による当初の MapReduce アプリケーションは、WWW の索引付けを目的としていました。このアプリケーションは今でもよく使われていますが、この単純なモデルで解決する問題は増え続けています。

表 1 に、従来のバッチ処理アプリケーションやストリーム処理アプリケーションを含め、現在使用できるオープンソースのビッグ・データ・ソリューションを記載します。Storm がオープンソースとして公開される約 1 年前に、Yahoo! の S4 分散ストリーム・コンピューティング・プラットフォームが Apache にオープンソースとして公開されています。2010年 10月にリリースされた S4 は、並列処理の複雑さをアプリケーション開発者に見せない HPC (High-Performance Computing) プラットフォームです。S4 が実装する分散クラスター・アーキテクチャーはスケーラブルで、部分的な耐障害性を備えています。

表 1. オープンソースのビッグ・データ・ソリューション
ソリューション開発者タイプ説明
StormTwitterストリーム処理ストリーム処理による Twitter の新しいビッグ・データ分析ソリューション
S4Yahoo!ストリーム処理Yahoo! の分散型ストリーム・コンピューティング・プラットフォーム
HadoopApacheバッチ処理MapReduce パラダイムの最初のオープンソース実装
SparkUC Berkeley AMPLabバッチ処理インメモリー・データ・セットとレジリエンシーをサポートする比較的最近の分析プラットフォーム
DiscoNokiaバッチ処理Nokia の分散型 MapReduce フレームワーク
HPCCLexisNexisバッチ処理ビッグ・データを対象とした HPC クラスター

さらに詳しく調べてください

Hadoop が最もよく知られたビッグ・データ分析ソリューションであり続けることに変わりはありませんが、他にも数々のソリューションがあり、それぞれに異なる特性があります。例えば、以前の記事で取り上げた Spark には、データ・セットのインメモリー機能が (消失したデータを再構築する機能とともに) 組み込まれています。けれども、Hadoop と Spark は、大量のデータ・セットをバッチ処理することに焦点を置いているという点では同じです。一方、新しいビッグ・データ分析モデルとなる Storm は、最近オープンソースとして公開されたことから、かなりの注目を集めています。

Hadoop とは異なり、Storm は計算システムであり、ストレージの概念を導入していません。そのため、Storm は、データが従来とは異なるソースから動的に到着しようが、データベースなどのストレージ・システムに保管されていようが、さまざまなコンテキストで使用することができます (あるいは、取引システムなどの他の機器をリアルタイムで操作するためにコントローラーが使用するデータにも使用することができます)。

Storm についての詳細、クラスターを稼働させる方法、およびその他のビッグ・データ分析ソリューション (バッチ処理とストリーム処理の両方) については、「参考文献」に記載されているリンクを参照してください。

参考文献

学ぶために

  • CEP (複合イベント処理) は、Storm だけでなく、Yahoo! の S4 をはじめとする他の多くのソリューションでも実装しているパターンです。Storm と S4 の大きな違いは、Storm は障害発生時のメッセージ処理を保証する一方、S4 ではメッセージを失う可能性があることです。
  • Storm の中心的開発者である Nathan Marz 氏が、この新しいオファリングを紹介する、興味深くて参考になる記事を書いています。Storm が初めて紹介されたのは、2011年 5月付けの記事「Preview of Storm: The Hadoop of Realtime Processing - BackType Technology」です。これに続き、8月にブログ「A Storm is coming: more details and plans for release」が投稿されています。
  • Storm wiki に、Storm とその原理に関するドキュメント、そして Storm の導入方法と新規プロジェクトにセットアップ方法を解説する各種チュートリアルが揃っています。また、Storm をローカル・モード、クラスター、Amazon で使用する方法を含め、Storm のさまざまな側面について参考になる資料も見つかります。
  • Spark: 高速なデータ分析のための新たな手段」(M. Tim Jones著、developerWorks、2011年11月) では、カリフォルニア大学バークレー校で開発されたインメモリー・レジリエンシーを備えたデータ分析プラットフォームを紹介しています。
  • アプリケーション仮想化の過去と未来」(M. Tim Jones 著、developerWorks、2011年5月) では、言語の抽象化に対する仮想化の適用について詳しく説明しています。Storm はその実装に VM ベースの言語である Clojure を使用し、さらに Java 技術やその他多くの言語を使用して内部 (ボルト) アプリケーションを作成します。
  • GitHub で、Storm のクラスおよびインターフェースを詳細に説明する完全なクラス・ツリーを参照してください。
  • Hadoop が、単純なバッチ処理を超えるモデルに対応するようになりました。例えば、バッチ・レベルのデータ処理よりも対話性に重点を置く場合、Hadoop ではスケジューリング機能を使ってデータの処理方法を変更できます。詳しくは、「Hadoop のスケジューリング機能」(M. Tim Jones 著、developerWorks、2011年12月) を読んでください。
  • IBM InfoSphere Streams: 驚くほど高いデータ・スループット・レートで 1 秒あたり何百万ものイベントやメッセージを処理することができる、極めてスケーラブルで強力なアナリティクス・プラットフォームを入手してください。
  • developerWorks Open source ゾーンには、オープソースのツールおよびオープンソース技術の使用に関する情報が豊富に揃っています。
  • さまざまな IBM 製品および IT 業界についての話題に絞った developerWorks のテクニカル・イベントや Webcast で最新情報を入手してください。
  • 無料の developerWorks Live! briefing に参加して、IBM の製品およびツールについての情報や IT 業界の動向についての情報を迅速に把握してください。
  • developerWorks の on-demand demos で、初心者向けの製品のインストールとセットアップから、熟練開発者向けの高度な機能に至るまで、さまざまに揃ったデモを見てください。
  • Twitter で developerWorks をフォローしてください。また、この記事 M. Tim Jones を Twitter でフォローすることもできます。

製品や技術を入手するために

  • ZeroMQ は、スケーラブル環境でのメッセージングを効率化するインテリジェントなトランスポート層です。ZeroMQ サイトで、このオファリングの概要、これを使用して問題を解決する方法、そしてこの取り組みをサポートする方法を学んでください。
  • Apache Zookeeper は、分散環境で極めて信頼性に優れた調整を可能にするオープンソースのプロジェクトです。Storm では、クラスター内の一連のノードでの調整機能として Zookeeper を使用しています。
  • Clojure は、Storm システムを実装するために使用されている言語です。Clojure は、Lips 言語の比較的新しい方言で、汎用言語として Rich Hicky 氏により作成されました。Clojure はまた、マルチスレッド・プログラミングを単純化します。
  • Apache Hadoop は、Yahoo! が開発した MapReduce プログラミング用のプラットフォームです。Hadoop に続き、MapReduce プログラミング用プラットフォームとしてカリフォルニア州バークレー校で開発された Spark は、Scala 言語で作成され、インメモリー処理によるレジリエンシーを備えたオープンソースのビッグ・データ・オファリングです。
  • Storm の他にも、オープンソースとして使用できるビッグ・データ・オファリングがあります。そのうちの 1 つ、Yahoo! S4 は Storm と同じくストリーム処理をベースとしたビッグ・データ・プラットフォームです。Hadoop のようなバッチ処理指向のオファリングには、Nokia の Disco プロジェクトLexisNexis の HPCC があります。
  • IBM InfoSphere Streams: InfoSphere Streams をダウンロードして、何千ものリアルタイム・ソースから送られてくる情報を迅速に取り込み、分析し、相互に関連付けるアプリケーションを構築してください。
  • ご自分に最適な方法で IBM 製品を評価してください。評価の方法としては、製品の試用版をダウンロードすることも、オンラインで製品を試してみることも、クラウド環境で製品を使用することもできます。

議論するために

コメント

developerWorks: サイン・イン

必須フィールドは(*)で示されます。


IBM ID が必要ですか?
IBM IDをお忘れですか?


パスワードをお忘れですか?
パスワードの変更

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


お客様が developerWorks に初めてサインインすると、お客様のプロフィールが作成されます。会社名を非表示とする選択を行わない限り、プロフィール内の情報(名前、国/地域や会社名)は公開され、投稿するコンテンツと一緒に表示されますが、いつでもこれらの情報を更新できます。

送信されたすべての情報は安全です。

ディスプレイ・ネームを選択してください



developerWorks に初めてサインインするとプロフィールが作成されますので、その際にディスプレイ・ネームを選択する必要があります。ディスプレイ・ネームは、お客様が developerWorks に投稿するコンテンツと一緒に表示されます。

ディスプレイ・ネームは、3文字から31文字の範囲で指定し、かつ developerWorks コミュニティーでユニークである必要があります。また、プライバシー上の理由でお客様の電子メール・アドレスは使用しないでください。

必須フィールドは(*)で示されます。

3文字から31文字の範囲で指定し

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


送信されたすべての情報は安全です。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=Open source, ビジネス・アナリティクス, Java technology, Linux
ArticleID=848510
ArticleTitle=Twitter Storm でビッグ・データをリアルタイムに処理する
publish-date=03132014