Spark によるデータ分析とパフォーマンス

Spark は Hadoop に代わる興味深いツールであり、インメモリー・データ処理に重点が置かれています。この記事の演習では、Scala、Spark、そして Spark のチューニング可能なパラメーターを使用した場合のマルチスレッドおよびマルチノードでのパフォーマンスを探ります。

M. Tim Jones, Independent author, Consultant

M.Tim JonesM. Tim Jones は組み込みソフトウェアのエンジニアであり、『Artificial Intelligence: A Systems Approach』、『GNU/Linux Application Programming』(現在、第 2 版です) や『AI Application Programming』(こちらも現在、第 2 版です)、それに『BSD Sockets Programming from a Multilanguage Perspective』などの著者でもあります。技術的な経歴は静止軌道衛星用のカーネル開発から、組み込みシステム・アーキテクチャーやネットワーク・プロトコル開発まで、広範にわたっています。彼はコロラド州ロングモン在住で、Intel に勤務するプラットフォーム・アーキテクトであり、執筆活動も行っています。



2012年 3月 15日

Tim とつながるには

Tim は developerWorks で人気の高いお馴染みの著者の 1 人です。Tim が書いたすべての developerWorks 記事を閲覧してみてください。また、developerWorks コミュニティーでは、Tim のプロフィールを調べることや、彼やその他の著者、そして他の開発者とつながることができます。

Spark は、インメモリー処理による極めて効率的なクラスター・コンピューティングを実現するために開発された、将来有望なビッグ・データ分析ソリューションです。Spark がターゲットとしている使用モデルには、反復アルゴリズムを組み込んだモデルも含まれています (つまり、より遅延の大きいファイルシステムにデータを格納するのではなく、メモリー内にデータを保持することによってメリットを得られる使用モデル)。この記事の演習に取り掛かる前に、Spark によるクラスター・コンピューティング手法、そしてこの手法と Hadoop によるクラスター・コンピューティング手法との違いについても十分に理解しておいてください。Spark の基礎知識と使用法については、先日公開された関連記事、「Spark: 高速なデータ分析のための新たな手段」で説明しています。

概要

この記事の演習では、以下の内容を実践します。

  • Scala 言語をインストールして実際に使用してみます
  • Scala のコレクションについて学びます
  • Spark をインストールして初めてのジョブを実行します
  • マルチスレッド化によってパフォーマンスを向上させます
  • 構成によってパフォーマンスを向上させます

前提条件

この演習一式には、Linux の基礎知識が必要です。これには、新しいアプリケーションをインストールできることも含まれます。Scala 言語の知識があれば役に立ちますが、必須というわけではありません。各演習で、必要なソフトウェア・パッケージのインストール方法を説明するので、これらの演習は順番どおりに行ってください。


演習 1: Scala 言語をインストールして実際に使用してみる

まず始めに、Scala 言語をインストールしてください。Scala のインストール・プロセスは、使用しているプラットフォームによって異なります。インストールがうまくいかない場合には、ソース・ツリーをダウンロードしてビルドを実行することでインストールすることができます。Spark に必要な Scala のバージョンは、パッケージ・マネージャーからインストールできるバージョンよりも新しいバージョンであるため、ソース・ツリーからインストールしてください。

インストールが完了したら、Scala インタープリター (「参考文献」で紹介している関連記事「Spark: 高速なデータ分析のための新たな手段」に、Scala インタープリターを実際に使用した例が説明されています) を起動し、いくつかのサンプルを試して (リスト 1 からリスト 3) その結果を検証します。


演習 2: Scala のコレクションについて学ぶ

Scala の興味深い特徴は、そのコレクション・ライブラリーにあります。Scala でのコレクションとは、リストやセット、あるいはマップなどといった要素をゼロ個以上含むコンテナーのことです。この概念は Spark にも関連してきます。Spark の分散データセットは、ローカル・コレクションとまったく同じように処理することができるためです。Scala のコレクションについては、「Scala 2.8 コレクション API」で詳しく説明しています。この参考文献を熟読して、配列およびリスト・コレクションを作成する方法を理解してください。

以下のステップを行ってください。

  1. int 型の配列を作成し、その配列に reverse メソッドを適用して、配列の要素を逆順に並び替えます。
  2. 文字列のリストを作成し、これらの文字列を繰り返し処理して個別に出力します。

演習 3: Spark をインストールして初めてのジョブを実行する

Spark の最新バージョンをダウンロードしてください。Spark を入手する最も簡単な方法は、以下のように git を使用することです。

$ git clone git://github.com/mesos/spark.git

上記のコマンドによって、./spark という名前の新規サブディレクトリーが作成されるので、cd を実行してカレント・ディレクトリーをこのサブディレクトリーに変更します。次に、以下の単純なビルド・ツール (sbt) を使って、ディストリビューションを更新してコンパイルします。

$ sbt/sbt update compile

このコマンドにより、複数のパッケージがダウンロードされて、さまざまな Scala ソースがコンパイルされます。構成を完了するには、カレント・ディレクトリーを ./spark/conf サブディレクトリーに変更し、spark-env.sh-template を spark-env.sh に名前変更した上で、以下の行を追加します。

export SCALA_HOME=/opt/scala-2.9.1.final

SCALA_HOME/bin を PATH に追加することも忘れないでください。

これで、Spark のインストールは完了したので、ローカル・ホスト上の 1 つのスレッドを使用して SparkPi サンプル・プログラムを実行します。このタスクを実行するには、関連記事「Spark: 高速なデータ分析のための新たな手段」(「参考文献」を参照) を参考にしてください。


演習 4: マルチスレッド化によってパフォーマンスを向上させる

この演習では、Spark でマルチスレッド化することによる違いを探ります。SparkPi サンプル・プログラムを使用して、実行ごとに関連付けるスレッドの数を変更することができます。

参考記事をガイドとして、ローカル・コンテキストで threads パラメーターをいろいろと試してみて、実行時間の違いを記録してください。


演習 5: 構成によってパフォーマンスを向上させる

Spark は、パフォーマンスを向上させるために使用できるいくつかの構成要素をサポートしています。Mesos を使用したクラスター構成について、以下の点を検討してください。

  • Spark のインメモリー処理という特徴を考えると、どの構成項目 (./conf を参照) がパフォーマンスの向上に寄与するのか?
  • 同様に (「参考文献」の「Spark FAQ」リンクを参照)、データセット・キャッシングのパフォーマンスを向上させるためのシステム・プロパティーは何か?

演習の実践結果

使用している Scala および Spark のバージョンによっては、ここに記載する出力とは異なる出力になる場合があります。

演習 1 の実践結果

Scala をインストールして、サンプルのいくつかを試してください (リスト 1 を参照)。関連記事「Spark: 高速なデータ分析のための新たな手段」(「参考文献」を参照) を読むと、Scala のディストリビューションから Scala をインストールして、利用できるようにする方法がわかります。export を実行することで環境変数に設定を追加し、これらの設定を永続的なものにすることができます。

リスト 1. Scala をインストールして検証する
$ wget http://www.scala-lang.org/downloads/distrib/files/scala-2.9.1.final.tgz
$ sudo tar xvfz scala-2.9.1.final.tgz --directory /opt
$ export SCALA_HOME=/opt/scala-2.9.1.final
$ export PATH=$SCALA_HOME/bin:$PATH
$ echo $PATH
/opt/scala-2.9.1.final/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
$ 
$ scala
Welcome to Scala version 2.9.1.final (OpenJDK Client VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> def square(x: Int) = x*x
square: (x: Int)Int

scala> square(3)
res0: Int = 9

scala> square(res0)
res1: Int = 81

scala> :quit
$

演習 2 の実践結果

この演習のサンプルでは、Scala インタープリターを使用して結果を検証します。リスト 2 に、配列の演習を実践した結果を記載します。

リスト 2. 配列を作成して反転する
scala> val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
numbers: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> numbers.reverse
res1: Array[Int] = Array(9, 8, 7, 6, 5, 4, 3, 2, 1)

scala>

リスト 3 に、リストの演習を実践した結果を記載します。

リスト 3. 文字列のリストを作成して繰り返し処理する
scala> val animals = List("dog", "cat", "mouse")
animals: List[java.lang.String] = List(dog, cat, mouse)

scala> animals foreach println
dog
cat
mouse

scala> for (elem <- animals) println(elem)
dog
cat
mouse

scala>

演習 3 の実践結果

アプリケーションとホスト/スライスを指定した ./run コマンドを使用して、SparkPi テストを実行します。このタスクを行っているのが、リスト 4 です。

リスト 4. SparkPi テストをローカルで実行する
$ ./run spark.examples.SparkPi local
12/01/23 20:55:33 INFO spark.CacheTrackerActor: Registered actor on port 7077
12/01/23 20:55:33 INFO spark.MapOutputTrackerActor: Registered actor on port 7077
12/01/23 20:55:33 INFO spark.SparkContext: Starting job...
12/01/23 20:55:33 INFO spark.CacheTracker: Registering RDD ID 0 with cache
12/01/23 20:55:33 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
12/01/23 20:55:33 INFO spark.CacheTrackerActor: Asked for current cache locations
12/01/23 20:55:33 INFO spark.LocalScheduler: Final stage: Stage 0
12/01/23 20:55:33 INFO spark.LocalScheduler: Parents of final stage: List()
12/01/23 20:55:33 INFO spark.LocalScheduler: Missing parents: List()
12/01/23 20:55:33 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents
12/01/23 20:55:33 INFO spark.LocalScheduler: Running task 0
12/01/23 20:55:33 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes
12/01/23 20:55:34 INFO spark.LocalScheduler: Finished task 0
12/01/23 20:55:34 INFO spark.LocalScheduler: Running task 1
12/01/23 20:55:34 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
12/01/23 20:55:34 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes
12/01/23 20:55:34 INFO spark.LocalScheduler: Finished task 1
12/01/23 20:55:34 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
12/01/23 20:55:34 INFO spark.SparkContext: Job finished in 0.3042134 s
Pi is roughly 3.13768
$

演習 4 の実践結果

さまざまなスレッド数で SparkPi テスト・プログラムを実行するには、スレッドの数を local (ホスト) 引数で指定することで簡単に行えます。引数に指定する数は実行に関連付けるスレッドの数です。当然のことながら、システムで使用できるハードウェア・スレッドの数によって実行時間は異なってきます。リスト 5 の実践結果には、1 つのスレッドで実行した場合と、2 つのスレッドで実行した場合を示しています。

ご覧のように、1 つのスレッドを使用した最初の実行時間は 0.59 秒であるのに対し、2 つのスレッドを使用した 2 回目の実行は 0.09 秒で完了しています。ただし、皆さんの結果がこれと同じ速度であるとは限りません。

リスト 5. スレッドの数を変えて SparkPi を実行する
$ ./run spark.examples.SparkPi local[1]
12/01/24 18:50:41 INFO spark.CacheTrackerActor: Registered actor on port 7077
12/01/24 18:50:41 INFO spark.MapOutputTrackerActor: Registered actor on port 7077
12/01/24 18:50:41 INFO spark.SparkContext: Starting job...
12/01/24 18:50:41 INFO spark.CacheTracker: Registering RDD ID 0 with cache
12/01/24 18:50:41 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
12/01/24 18:50:41 INFO spark.CacheTrackerActor: Asked for current cache locations
12/01/24 18:50:41 INFO spark.LocalScheduler: Final stage: Stage 0
12/01/24 18:50:41 INFO spark.LocalScheduler: Parents of final stage: List()
12/01/24 18:50:41 INFO spark.LocalScheduler: Missing parents: List()
12/01/24 18:50:41 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents
12/01/24 18:50:41 INFO spark.LocalScheduler: Running task 0
12/01/24 18:50:41 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes
12/01/24 18:50:42 INFO spark.LocalScheduler: Finished task 0
12/01/24 18:50:42 INFO spark.LocalScheduler: Running task 1
12/01/24 18:50:42 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes
12/01/24 18:50:42 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
12/01/24 18:50:42 INFO spark.LocalScheduler: Finished task 1
12/01/24 18:50:42 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
12/01/24 18:50:42 INFO spark.SparkContext: Job finished in 0.595091783 s
Pi is roughly 3.12736
$ ./run spark.examples.SparkPi local[2]
12/01/24 18:50:46 INFO spark.MapOutputTrackerActor: Registered actor on port 7077
12/01/24 18:50:46 INFO spark.CacheTrackerActor: Registered actor on port 7077
12/01/24 18:50:46 INFO spark.SparkContext: Starting job...
12/01/24 18:50:46 INFO spark.CacheTracker: Registering RDD ID 0 with cache
12/01/24 18:50:46 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
12/01/24 18:50:46 INFO spark.CacheTrackerActor: Asked for current cache locations
12/01/24 18:50:46 INFO spark.LocalScheduler: Final stage: Stage 0
12/01/24 18:50:46 INFO spark.LocalScheduler: Parents of final stage: List()
12/01/24 18:50:46 INFO spark.LocalScheduler: Missing parents: List()
12/01/24 18:50:46 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents
12/01/24 18:50:46 INFO spark.LocalScheduler: Running task 0
12/01/24 18:50:46 INFO spark.LocalScheduler: Running task 1
12/01/24 18:50:46 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes
12/01/24 18:50:46 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes
12/01/24 18:50:46 INFO spark.LocalScheduler: Finished task 1
12/01/24 18:50:46 INFO spark.LocalScheduler: Finished task 0
12/01/24 18:50:46 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
12/01/24 18:50:46 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
12/01/24 18:50:46 INFO spark.SparkContext: Job finished in 0.098092002 s
Pi is roughly 3.14388
$

local を指定する代わりに、接続先の Mesos マスターを指定することもできます。この場合、単一ノードでの複数のスレッドではなく、ノードのクラスターがサポートされます (これは、よりパフォーマンスに優れた方法です)。

使用できるハードウェア・スレッド (仮想 CPU) の数を調べるには、以下のコマンドを実行します。

$ grep processor /proc/cpuinfo

演習 5 の実践結果

環境の主要な要素を定義するのは Spark 構成ファイル (./conf/spark-env.sh) ですが、各ノードの Java 仮想マシンをサポートするためのメモリー・サイズを指定するオプションが 1 つ (SPARK_MEM) あります。Spark ではインメモリーのデータセットに重点が置かれることから、メモリー・サイズを大きくすれば、パフォーマンスが向上することになります (結果はワークロードに依存します)。

Spark FAQ で明確に示されているように、データセットによっては、メモリー内に完全に収まりきらないこともあります。その場合、Spark はメモリー内に収まらなかったパーティションを再計算するか (デフォルト)、パーティションをディスクにキャッシュします (そのように構成されている場合)。パーティションを再計算する代わりにディスクにキャッシュするには、spark.DiskSpillingCache プロパティーを使用します。

参考文献

学ぶために

製品や技術を入手するために

  • 最新バージョンの Scala をソース・ツリーからインストールしてください。
  • ご自分に最適な方法で IBM 製品を評価してください。評価の方法としては、製品の試用版をダウンロードすることも、オンラインで製品を試してみることも、クラウド環境で製品を使用することもできます。また、SOA Sandbox では、数時間でサービス指向アーキテクチャーの実装方法を効率的に学ぶことができます。

議論するために

  • developerWorks コミュニティーに参加してください。ここでは他の developerWorks ユーザーとのつながりを持てる他、開発者によるブログ、フォーラム、グループ、ウィキを調べることができます。

コメント

developerWorks: サイン・イン

必須フィールドは(*)で示されます。


IBM ID が必要ですか?
IBM IDをお忘れですか?


パスワードをお忘れですか?
パスワードの変更

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


お客様が developerWorks に初めてサインインすると、お客様のプロフィールが作成されます。会社名を非表示とする選択を行わない限り、プロフィール内の情報(名前、国/地域や会社名)は公開され、投稿するコンテンツと一緒に表示されますが、いつでもこれらの情報を更新できます。

送信されたすべての情報は安全です。

ディスプレイ・ネームを選択してください



developerWorks に初めてサインインするとプロフィールが作成されますので、その際にディスプレイ・ネームを選択する必要があります。ディスプレイ・ネームは、お客様が developerWorks に投稿するコンテンツと一緒に表示されます。

ディスプレイ・ネームは、3文字から31文字の範囲で指定し、かつ developerWorks コミュニティーでユニークである必要があります。また、プライバシー上の理由でお客様の電子メール・アドレスは使用しないでください。

必須フィールドは(*)で示されます。

3文字から31文字の範囲で指定し

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


送信されたすべての情報は安全です。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=Linux, Open source
ArticleID=801283
ArticleTitle=Spark によるデータ分析とパフォーマンス
publish-date=03152012