目次


Spark: 高速なデータ分析のための新たな手段

Comments

Spark は Hadoop と同じようにオープンソースのクラスター・コンピューティング環境ですが、いくつか実用的な面で違いがあり、特定のワークロードに非常に適しています。つまり Spark ではインメモリー分散データセットを扱えるため、対話型のクエリーのみならず反復的なワークロードも最適化することができます。

Spark は Scala 言語で実装されており、アプリケーション・フレームワークとして Scala を使用しています。Hadoop とは異なり、Spark と Scala は緊密に統合されており、Scala は分散されたデータセットをローカル・オブジェクトの集合として容易に操作することができます。

Spark は分散されたデータセットに対して反復的にジョブを実行するために作成されたものですが、実際には Hadoop を補完し、Hadoop ファイルシステムと並行して Spark を実行することができます。この動作はサードパーティーによる Mesos というクラスタリング・フレームワークを使用することで実現されます。Spark はカリフォルニア大学バークレー校 AMP Lab (Algorithms, Machines, and People Lab) で、大規模で低遅延のデータ分析アプリケーションを作成するために開発されました。

Spark のクラスター・コンピューティング・アーキテクチャー

Spark は Hadoop と共通する点もありますが、実用的な面では Hadoop と違いのある新しいクラスター・コンピューティング・フレームワークと言うことができます。何と言っても、Spark はクラスター・コンピューティングにおける特定のタイプのワークロードを対象に設計されています。具体的には、(機械学習アルゴリズムなどの) 並列演算全体にわたってデータのワーキング・セットを再利用するワークロードを対象としています。こうした種類のワークロードを最適化するために、Spark ではインメモリー・クラスター・コンピューティングという概念を導入しています。この方式では、データセットをメモリー内にキャッシュすることによってデータセットへのアクセス遅延を低減しています。

また Spark は RDD (Resilient Distributed Dataset) という抽象化も導入しています。RDD とは、一連のノードに分散された読み取り専用オブジェクトの集合のことです。これらの集合は、データセットの一部が消失した場合にも再構築できるため、レジリエンシーに優れています。データセットの一部を再構築するプロセスは lineage を保持するフォルト・トレランス・メカニズムを利用します (lineage というのは、データを抽出した際のプロセスに基づいてデータセットの一部を再構築するための情報です)。RDD は Scala オブジェクトとして表現され、ファイルから作成することも、(複数ノードに分散された) 並列化されたスライスとして作成することも、別の RDD から変換して作成することも、既存の RDD のパーシスタンスを変更することによって (例えばメモリー内のキャッシュに入れるように要求することなどによって) 作成することもできます。

Spark では、アプリケーションはドライバーと呼ばれ、単一ノードで実行される演算や、一連のノードにわたって並列に実行される演算を実装するのは、このドライバーです。Hadoop の場合と同様、Spark は単一ノードのクラスターもマルチノードのクラスターもサポートします。マルチノードによる演算の場合、Spark は Mesos クラスター・マネージャーを利用します。Mesos は分散アプリケーションでリソースを共有、分離するための効率的なプラットフォームです (図 1)。この構成により、1 つの共有ノード・プールで Spark と Hadoop が共存することができます。

図 1. Spark は Mesos クラスター・マネージャーを利用してリソースを共有、分離する
リソースの共有と分離に関する Mesos と Spark の関係を示した図
リソースの共有と分離に関する Mesos と Spark の関係を示した図

Spark のプログラミング・モデル

ドライバーはデータセットに対し、アクションと変換という 2 つのタイプの操作を行うことができます。アクションはデータセットに対して計算を実行し、値をドライバーに返します。変換は既存のデータセットから新しいデータセットを作成します。アクションの例としては、(関数を使用して) Reduce 演算を実行する場合や、データセットに対して繰り返し処理を行う場合 (Map 演算と同じように各要素に対して関数を実行する場合) などがあります。変換の例としては、Map 演算や Cache 操作 (新しいデータセットをメモリーに格納するように要求する操作) などがあります。

この 2 つの操作の例について、この後説明しますが、その前に Scala 言語について説明しておきます。

Scala の簡単な紹介

Scala はインターネットで使用されている、とっておきの言語の 1 つかもしれません。Twitter、LinkedIn、Foursquare など、インターネット上で最もアクセスが頻繁な Web サイトで、(Web アプリケーション・フレームワークである Lift を通じて) Scala が使用されています。また、金融機関が Scala のパフォーマンスに関心を持ったことを示す証拠もあります (例えば EDF Trading が金融派生商品の価格設定に使用している、など)。

Scala は、命令型言語、関数型言語、オブジェクト指向言語に関連する機能を違和感なく手軽な方法でサポートするという意味で、マルチパラダイムの言語です。オブジェクト指向の観点から見ると、Scala ではすべての値はオブジェクトです。同様に、関数型の観点から見ると、すべての関数は値です。また Scala は、表現力豊かでタイプセーフな型体系を持つ静的型付けの言語でもあります。

さらに、Scala は仮想マシン (Virtual Machine: VM) 言語でもあり、Scala コンパイラーが生成するバイト・コードと Java ランタイム環境 (Java Runtime Environment: JRE) を使用して Java 仮想マシン (Java Virtual Machine: JVM) 上で直接実行することができます。この構成により、JVM を実行可能な環境であればほぼすべての環境で Scala を実行することができます (ただし Scala ランタイム・ライブラリーを追加する必要があります)。さらには、既存の膨大な Java ライブラリーや、皆さんがこれまで作成した Java コードを活用することもできます。

最後に、Scala は拡張可能です。実際には、Scala という名前は Scalable Language (拡張可能な言語) を略したものであり、Scala は簡単に拡張できるように定義されており、その拡張をすっきりとした形で言語に統合することができます。

Scala の例

実際の Scala 言語の例をいくつか見てみましょう。Scala には独自のインタープリターが付属しているため、インタラクティブな方法で Scala を試すことができます。この記事では Scala の有効な使い方については説明しませんが、詳しい情報へのリンクを「参考文献」に挙げてあります。

リスト 1 以降では、Scala のインタープリターを使用して Scala 言語の動作を簡単に調べます。Scala を起動すると Scala のプロンプトが表示され、このプロンプトを使って式やプログラムの評価をインタラクティブに行うことができます。まず、2 つの変数を作成します。1 つは不変変数 (val: 単一代入変数と呼ばれます) であり、もう 1 つは可変変数 (var) です。var として定義されている b を変更するのは成功しますが、val を変更しようとするとエラーが返されることに注意してください。

リスト 1. Scala での単純な変数
$ scala
Welcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> val a = 1
a: Int = 1

scala> var b = 2
b: Int = 2

scala> b = b + a
b: Int = 3

scala> a = 2
<console>6: error: reassignment to val
       a = 2
         ^

次に、Int の 2 乗を計算して結果を返す単純なメソッドを作成します。Scala でメソッドを定義するには、def に続けてメソッド名と一連のパラメーターを指定し、等号の後にいくつかの命令 (この場合は 1 つの命令) を記述します。戻り値はメソッド自体から推論することができるため、戻り値は指定されていません。この方法が変数に値を代入する方法と非常によく似ていることに注目してください。ここでは 3 というオブジェクトに対してこのプロセスを実行し、res0 という結果が示されています (res0 は Scala インタープリターによって自動的に作成されます)。このすべてを示したものがリスト 2 です。

リスト 2. Scala での単純なメソッド
scala> def square(x: Int) = x*x
square: (x: Int)Int

scala> square(3)
res0: Int = 9

scala> square(res0)
res1: Int = 81

次に、単純なクラスを Scala で作成する方法を見てみましょう (リスト 3 を参照)。ここでは名前のコンストラクターを String 引数として受け付ける単純な Dog クラスを定義しています。ここで、クラスがパラメーターを直接受け取ることに注意してください (クラス本体にクラスのパラメーターの定義がありません)。1 つのメソッドがあり、このメソッドは呼び出されるとストリングを出力します。クラスのインスタンスを新たに作成し、それからメソッドを呼び出します。インタープリターが縦棒を挿入していることに注意してください。縦棒はコードの一部ではありません。

リスト 3. Scala での単純なクラス
scala> class Dog( name: String ) {
     |   def bark() = println(name + " barked")
     | }
defined class Dog

scala> val stubby = new Dog("Stubby")
stubby: Dog = Dog@1dd5a3d

scala> stubby.bark
Stubby barked

scala>

これを終えたら、「:quit」と入力し、Scala インタープリターを終了します。

Scala と Spark をインストールする

最初のステップとして、Scala をダウンロードして構成します。リスト 4 に示すコマンドは、Scala のインストールをダウンロードして準備する方法を示しています。Spark のドキュメントによれば Scala のバージョン 2.8 が必要なので、このバージョンの Scala を使用します。

リスト 4. Scala をインストールする
$ wget http://www.scala-lang.org/downloads/distrib/files/scala-2.8.1.final.tgz
$ sudo tar xvfz scala-2.8.1.final.tgz --directory /opt/

Scala のインストール場所を見つけられるように、以下の行を .bashrc に追加します (シェルとして Bash を使用している場合で説明しています)。

export SCALA_HOME=/opt/scala-2.8.1.final
export PATH=$SCALA_HOME/bin:$PATH

次に、リスト 5 に示すように Scala のインストールをテストします。これらの一連のコマンドにより、bashrc ファイルへの変更がロードされ、Scala インタープリター・シェルに対する簡単なテストが実行されます。

リスト 5. インタラクティブな Scala を構成し、実行する
$ scala
Welcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> println("Scala is installed!")
Scala is installed!

scala> :quit
$

リスト 5 に示すように、Scala プロンプトが表示されるはずです。「:quit」と入力して終了します。Scala が JVM のコンテキストで実行されることに注意してください。そのため、JVM も必要です。私は Ubuntu を使用しており、Ubuntu にはデフォルトで OpenJDK が付属しています。

次に、Spark フレームワークの最新コピーを入手します。そのためにはリスト 6 に示すスクリプトを使用します。

リスト 6. Spark フレームワークをダウンロードしてインストールする
wget https://github.com/mesos/spark/tarball/0.3-scala-2.8/
mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz
$ sudo tar xvfz mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz

次に、Scala のホーム・ディレクトリーを指定する以下の行を ./conf/spar-env.sh に追加して、Spark の構成をセットアップします。

export SCALA_HOME=/opt/scala-2.8.1.final

セットアップの最後のステップとして、simple-build-tool (sbt) を使用してディストリビューションを更新します。sbt は Scala のビルド・ツールであり、Spark のディストリビューションで使用されています。mesos-spark-c86af80 サブディレクトリーで以下のように更新とコンパイルのステップを実行します。

$ sbt/sbt update compile

このステップを実行する場合にはインターネットに接続されている必要があることに注意してください。このステップを終了したら、リスト 7 のように Spark を簡単にテストします。このテストは、円周率の近似値を計算する SparkPi というサンプルを実行するように要求しています (単位正方形内でランダム・ポイント・サンプリングを行うことで近似値を計算しています)。ここに示したフォーマットでは、サンプル・プログラム (spark.examples.SparkPi) とホスト・パラメーターを要求しています (ホスト・パラメーターにより、Mesos マスター (この場合は単一ノードのクラスターなので、localhost) と使用するスレッド数を定義します)。リスト 7 では 2 つのタスクが実行されていますが、2 つのタスクが連続して実行されることに注意してください (タスク 0 が起動されて終了した後に、タスク 1 が起動されています)。

リスト 7. Spark を簡単にテストする
$ ./run spark.examples.SparkPi local[1]
11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registered actor on port 50501
11/08/26 19:52:33 INFO spark.MapOutputTrackerActor: Registered actor on port 50501
11/08/26 19:52:33 INFO spark.SparkContext: Starting job...
11/08/26 19:52:33 INFO spark.CacheTracker: Registering RDD ID 0 with cache
11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
11/08/26 19:52:33 INFO spark.CacheTrackerActor: Asked for current cache locations
11/08/26 19:52:33 INFO spark.LocalScheduler: Final stage: Stage 0
11/08/26 19:52:33 INFO spark.LocalScheduler: Parents of final stage: List()
11/08/26 19:52:33 INFO spark.LocalScheduler: Missing parents: List()
11/08/26 19:52:33 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ...
11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 0
11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes
11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 0
11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 1
11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes
11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 1
11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
11/08/26 19:52:33 INFO spark.SparkContext: Job finished in 0.145892763 s
Pi is roughly 3.14952
$

スレッドの数を増やすと、並列に実行されるスレッドの数を増やせるだけではなく、より短時間でジョブを実行できるようになります (それを示したものがリスト 8 です)。

リスト 8. 別の簡単なテストとして Spark で 2 つのスレッドを使用する
$ ./run spark.examples.SparkPi local[2]
11/08/26 20:04:30 INFO spark.MapOutputTrackerActor: Registered actor on port 50501
11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registered actor on port 50501
11/08/26 20:04:30 INFO spark.SparkContext: Starting job...
11/08/26 20:04:30 INFO spark.CacheTracker: Registering RDD ID 0 with cache
11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
11/08/26 20:04:30 INFO spark.CacheTrackerActor: Asked for current cache locations
11/08/26 20:04:30 INFO spark.LocalScheduler: Final stage: Stage 0
11/08/26 20:04:30 INFO spark.LocalScheduler: Parents of final stage: List()
11/08/26 20:04:30 INFO spark.LocalScheduler: Missing parents: List()
11/08/26 20:04:30 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ...
11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 0
11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 1
11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes
11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes
11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 0
11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 1
11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
11/08/26 20:04:30 INFO spark.SparkContext: Job finished in 0.101287331 s
Pi is roughly 3.14052
$

Scala を使用して単純な Spark アプリケーションをビルドする

Spark アプリケーションをビルドするためには、Spark とその依存関係を 1 つの Java アーカイブ (JAR) ファイルに含める必要があります。以下のように sbt を使用して、この JAR を Spark の最上位ディレクトリーに作成します。

$ sbt/sbt assembly

すると ./core/target/scala_2.8.1/"Spark Core-assembly-0.3.jar" というファイルが作成されます。このファイルを CLASSPATH に追加して利用できるようにします。この例では、この JAR は使用しません。なぜなら、ここでは JAR をコンパイルせずに Scala インタープリターを使用して実行するからです。

この例では標準的な MapReduce 変換を使用します (リスト 9 を参照)。この例はまず、必要な Spark クラスをインポートします。次に、クラス (SparkTest) と、このクラスの main メソッドを定義します。main メソッドは後で使用する引数を構文解析します。これらの引数により、Spark を実行する環境 (この場合は単一ノードのクラスター) を定義します。次に、クラスターへのアクセス方法を Spark に指示する SparkContext オブジェクトを作成します。このオブジェクトには、(SparkContext の main メソッドに引数として渡される) Mesos マスターの名前と、ジョブを割り当てる対象となる名前 (SparkTest) が引数として必要です。続いて、コマンドラインから入力されたスライスの数を解析します (このスライスの数が、このジョブのために Spark が使用するスレッド数となります)。セットアップ項目として最後に残っているのは、MapReduce 演算に使用するテキスト・ファイルを指定することです。

いよいよ、Spark のサンプルの本質的な部分に入ります。この部分は一連の変換で構成されています。対象のファイルを使用して flatMap メソッドを呼び出すと、(指定された関数によってテキスト行をトークンに分割することにより) RDD が返されます。この RDD は、キーと値のペアを作成する map メソッドに渡され、最後に、キーと値のペアを集約する reduceByKey メソッドに渡されます。そのために、キーと値のペアが _ + _ という匿名関数に渡されます。この関数は単純にキーと値という 2 つのパラメーターを取り、その 2 つ (StringInt) を連結することによって結果を返します。そしてこの値がテキスト・ファイルとして (出力ディレクトリーに) 出力されます。

リスト 9. Scala/Spark での MapReduce (SparkTest.scala)
import spark.SparkContext
import SparkContext._

object SparkTest {

  def main( args: Array[String]) {

    if (args.length == 0) {
      System.err.println("Usage: SparkTest <host> [<slices>]")
      System.exit(1)
    }

    val spark = new SparkContext(args(0), "SparkTest")
    val slices = if (args.length > 1) args(1).toInt else 2

    val myFile = spark.textFile("test.txt")
    val counts = myFile.flatMap(line => line.split(" "))
                        .map(word => (word, 1))
                        .reduceByKey(_ + _)

    counts.saveAsTextFile("out.txt")

  }

}

SparkTest.main(args)

このスクリプトを実行するには、以下のコマンドを入力して実行するように要求するだけです。

$ scala SparkTest.scala local[1]

MapReduce テスト・ファイルは (output/part-00000 として) 出力ディレクトリーに出力されます。

ビッグ・データ分析のための他のフレームワーク

Hadoop が開発されて以来、他にも注目に値するビッグ・データ分析プラットフォームがいくつも登場しています。これらのプラットフォームは、単純なスクリプト・ベースのものから Hadoop と同じような本番環境に至るまで多様です。

最も単純なプラットフォームの 1 つは bashreduce と呼ばれるものです。名前からもわかるように、bashreduce を使用すると、Bash 環境の複数のマシンで MapReduce タイプの演算を実行することができます。bashreduce は使用する予定のクラスターのマシンに対して (パスワードなしの) Secure Shell を使用して接続します。そして UNIX スタイルのツール (sortawknetcat など) を使用してジョブをリクエストする際にはスクリプトとして機能します。

GraphLab も MapReduce 抽象化の実装として興味深いものの 1 つであり、機械学習アルゴリズムを並列実装することに重点を置いています。GraphLab では、(別々のホスト上で) 個別に独立して計算を実行できるように Map ステージで計算を定義し、Reduce ステージで計算結果を組み合わせます。

最後に、ビッグ・データ処理に最近登場したものとして、Twitter の Storm があります (Twitter が BackType を買収したことによって Storm は Twitter のものになりました)。Storm は「リアルタイム処理の Hadoop」と定義されており、ストリーム処理と連続計算に重点が置かれています (計算の結果としてストリームが出力されます)。Storm は Clojure (Lisp 言語の最新の方言) で作成されていますが、(Ruby や Python など) どのような言語で作成されたアプリケーションもサポートします。Twitter は 2011年 9月に Strom をオープンソースとしてリリースしました。

詳細については「参考文献」を参照してください。

さらに詳しく調べてください

Spark は、増えつつあるビッグ・データ分析ソリューションに新たに加わった興味深いツールです。Spark は分散されたデータセットを処理するための効率的なフレームワークであるだけではなく、(単純で簡潔な Scala スクリプトにより) 実際に効率的な方法で処理を行います。Spark も Scala も活発に開発が進められています。しかし、インターネット上の主要なサービスで採用されたことから、既に Spark と Scala は共に、興味深いオープンソースのソフトウェアから基本となる Web 技術に移行したようです。


ダウンロード可能なリソース


関連トピック

  • EDF Trading: Implementing a domain-specific language for derivative pricing with Scala: Scala は株取引など、さまざまな業界に採用されています。その一例をこの動画で学んでください。この動画では、EDF Trading がどのように Scala を利用して金融派生商品の価格設定用ドメイン特化言語を実装したかを解説しています。
  • アプリケーション仮想化の過去と未来」(M. Tim Jones 著、developerWorks、2011年5月) は、仮想マシン言語とその実装について紹介しています。
  • Ceylon: 本物の進化なのか、それともありふれた新しい言語なのか」(M. Tim Jones 著、developerWorks、2011年7月) は、JVM を利用するもう 1 つの興味深い (開発中の) 仮想マシン言語を解説しています。
  • First Steps to Scala は Scala 言語を紹介した素晴らしい記事です (この記事の一部は Scala の設計者である Martin Odersky 氏が執筆しています)。2007年に書かれたこの長い紹介記事は、Scala のさまざまな側面を解説しています。もう 1 つ有用な例として、多種多様なコード・パターンで Scala を解説した Code Examples for Programming in Scala があります。
  • Linux と Hadoop による分散コンピューティング」(Ken Mann と M. Tim Jones の共著、developerWorks、2008年12月) は、大量のデータを分散処理するための MapReduce の仕組みを含めて Hadoop のアーキテクチャーを紹介しています。
  • developerWorks on Twitter: developerWorks の最新ニュースをフォローしてください。この記事の著者も M. Tim Jones から Twitter でフォローすることもできます。
  • developerWorks の Open source ゾーン: オープンソース技術を使用した開発や、IBM 製品でオープンソース技術を使用するためのハウ・ツー情報やツール、プロジェクトの更新情報など、豊富な情報が用意されています。
  • developerWorks podcasts: ソフトウェア開発者のための興味深いインタビューや議論を聞いてください。
  • Spark は Scala 言語で作成、サポートされたインメモリー・データ分析ソリューションを導入しています。
  • simple-build-tool は Scala 言語に採用されているビルド・ソリューションです。simple-build-tool は小規模なプロジェクトのための単純な手段であり、また複雑なビルドのための高度な機能も備えています。
  • Lift は Scala のための Web アプリケーション・フレームワークであり、Ruby に対する Rails フレームワークと似ています。Lift は Twitter と Foursquare で実際に使われています。
  • Mesos プロジェクト: Spark 自体は分散されたワークロードをサポートしていません。しかしこのクラスター・マネージャーを使用することにより、分散アプリケーションのネットワーク全体にわたってリソースの分離と共有を実現しています。
  • bashreduce (Bash スクリプト・ベースの実装)、GraphLab (機械学習を対象)、Storm (BackType から Twitter に買収され、Clojure で作成されたリアルタイムの分散ストリーム処理システム): Hadoop 以来、いくつかのビッグ・データ分析プラットフォームが登場しました。Spark を除き、これら 3 つの製品で並列コンピューティング・アーキテクチャーを実装することができます。
  • 皆さんの目的に最適な方法で IBM 製品を評価してください: 製品の試用版をダウンロードする方法、オンラインで製品を試す方法、クラウド環境で製品を使う方法、あるいは SOA Sandbox で数時間を費やし、サービス指向アーキテクチャーの効率的な実装方法を学ぶ方法などがあります。IBM では、いくつかの情報管理製品の試用版を提供しています。この記事の読者であればデータ分析に関心があるはずなので、IBM SPSS Text Analytics for SurveysIBM Cognos Business IntelligenceCognos Express を試してみてはいかがでしょう。

コメント

コメントを登録するにはサインインあるいは登録してください。

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=Open source, Information Management
ArticleID=776893
ArticleTitle=Spark: 高速なデータ分析のための新たな手段
publish-date=12022011