Анализ данных с помощью Spark и его производительность

Spark - интересная альтернатива Hadoop с акцентом на обработку данных в оперативной памяти. Этот семинар посвящен производительности многопоточной и многоузловой обработки с помощью Scala и Spark, а также параметрам их настройки.

M. Тим Джонс, инженер-консультант, Emulex Corp.

М. Тим ДжонсМ. Тим Джонс - архитектор встроенного ПО и, кроме того, автор книг Artificial Intelligence: A Systems Approach, GNU/Linux Application Programming (выдержавшей на данный момент второе издание), AI Application Programming (второе издание) и BSD Sockets Programming from a Multilanguage Perspective. Он имеет обширный опыт разработки ПО в самых разных предметных областях - от ядер специальных ОС для геосинхронных космических аппаратов до архитектур встраиваемых систем и сетевых протоколов. Тим - инженер-консультант Emulex Corp., Лонгмонт, Колорадо.



04.12.2012

Связаться с Тимом

Тим ― один из самых популярных и плодовитых наших авторов. Вот все его статьи, опубликованные на портале developerWorks. Познакомьтесь с профилем Тима и поддерживайте связь с ним и другими авторами и разработчиками через сообщество developerWorks.

Spark - это многообещающее решение для анализа "больших данных", предназначенное для высокоэффективных кластерных вычислений с использованием обработки в оперативной памяти. В число целевых моделей его использования входят модели с итерационными алгоритмами (то есть, те, что могут извлекать пользу из хранения данных в памяти, вместо их передачи в более медлительную файловую систему). Прежде чем выполнять эти упражнения, убедитесь, что вы хорошо понимаете подход Spark к кластерным вычислениям и его отличия от Hadoop. Читайте о Spark и его использовании в недавней статье Spark, альтернатива для быстрого анализа данных.

Обзор

Эти упражнения обеспечат вам практику в следующих областях:

  • установка интерпретатора Scala и экспериментирование с ним;
  • изучение коллекций Scala;
  • установка Spark и выполнение своего первого задания;
  • повышение производительности с использованием многопоточности;
  • повышение производительности с помощью настройки.

Предварительные замечания

Для выполнения этих упражнений требуются некоторые базовые знания в области Linux®, включая умение устанавливать новые приложения. Знание языка Scala полезно, но не обязательно. Эти упражнения нужно выполнять по порядку, так как они иллюстрируют процесс установки необходимых пакетов ПО.


Упражнение 1: установка языка Scala и экспериментирование с ним

Начнем с установки языка Scala. Процесс установки Scala зависит от платформы. В худшем случае придется загрузить дерево исходного кода и выполнить сборку и установку. Поскольку для Spark требуется более поздняя версия Scala, чем та, что доступна через менеджеры пакетов, установите ее из дерева исходного кода.

После установки запустите интерпретатор Scala (см. статью «Spark, альтернатива для быстрого анализа данных» в разделе Ресурсы), попробуйте выполнить некоторые примеры (листинги 1-3) и проверьте результаты.


Упражнение 2: изучение коллекций Scala

Интересной особенностью Scala являются библиотеки коллекций этого языка. В Scala коллекция ― это контейнер, содержащий некоторое количество объектов, такой как список, множество или карта. Эта концепция относится и к Spark, так как с его распределенными наборами данных можно работать как с локальной коллекцией. Подробнее о коллекциях Scala можно прочесть в документе The Scala 2.8 Collections API. Внимательно изучите его, чтобы понять, как создать коллекцию типа массива и списка.

Выполните следующие действия.

  1. Создайте массив целых чисел (int) и примените к нему метод reverse, чтобы выстроить их в обратном порядке.
  2. Создайте список строк и распечатайте каждую из них.

Упражнение 3: установка Spark и выполнение своего первого задания

Загрузите последнюю версию Spark. Проще всего это сделать с помощью команды git:

$ git clone git://github.com/mesos/spark.git

Эта командная строка приводит в новый подкаталог ./spark. Выполните команду cd в этом подкаталоге. Теперь обновите и скомпилируйте дистрибутив с помощью простого инструмента сборки (sbt):

$ sbt/sbt update compile

Это приведет к загрузке нескольких пакетов помимо компиляции ряда исходных файлов Scala. Для завершения настройки перейдите в подкаталог ./spark/conf, переименуйте spark-env.sh-template в spark-env.sh и добавьте следующую строку:

export SCALA_HOME=/opt/scala-2.9.1.final

Не забудьте также добавить SCALA_HOME/bin в переменную PATH.

Теперь, когда Spark установлен, запустите программу-пример SparkPi с одним потоком на локальной машине. В качестве руководства по решению этой задачи используйте статью "Spark, альтернатива для быстрого анализа данных" (см. раздел Ресурсы).


Упражнение 4: Повышение производительности с использованием многопоточности

В этом упражнении исследуется разница при многопоточном выполнении программы Spark. Используя программу-пример SparkPi, можно изменять количество потоков в конкретном исполнении.

Руководствуясь указанной статьей, поэкспериментируйте с параметром threads в локальном контексте и обратите внимание на разницу во времени выполнения.


Упражнение 5: повышение производительности с помощью настройки

Spark поддерживает несколько параметров конфигурации, которые позволяют добиться более высокой производительности. Имея в виду кластерную конфигурацию Spark с Mesos:

  • Какие параметры конфигурации (см. ./conf) могут способствовать повышению производительности, учитывая способность Spark обрабатывать данные в памяти?
  • Аналогично (см. ссылку на FAQ по Spark в разделе Ресурсы), какое свойство системы может повысить производительность при кэшировании набора данных?

Решения упражнений

Некоторые результаты могут отличаться в зависимости от версии Scala и Spark.

Решение упражнения 1

Выполните установку Scala и попробуйте выполнить некоторые примеры (см. листинг 1). В статье «Spark, альтернатива для быстрого анализа данных» из раздела Ресурсы показано, как установить интерпретатор Scala из дистрибутива и получить доступ к нему. Можно добавить экспортируемые артефакты в свою среду, чтобы сделать их персистентными.

Листинг 1. Установка интерпретатора Scala и экспериментирование с ним
$ wget http://www.scala-lang.org/downloads/distrib/files/scala-2.9.1.final.tgz
$ sudo tar xvfz scala-2.9.1.final.tgz --directory /opt
$ export SCALA_HOME=/opt/scala-2.9.1.final
$ export PATH=$SCALA_HOME/bin:$PATH
$ echo $PATH
/opt/scala-2.9.1.final/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
$ 
$ scala
Welcome to Scala version 2.9.1.final (OpenJDK Client VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> def square(x: Int) = x*x
square: (x: Int)Int

scala> square(3)
res0: Int = 9

scala> square(res0)
res1: Int = 81

scala> :quit
$

Решение упражнения 2

В этих примерах для проверки своих результатов используйте интерпретатор Scala. Листинг 2 содержит решение упражнения для массива.

Листинг 2. Создание массива и его перестройка
scala> val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
numbers: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> numbers.reverse
res1: Array[Int] = Array(9, 8, 7, 6, 5, 4, 3, 2, 1)

scala>

Листинг 3 содержит решение упражнения для списка.

Листинг 3. Создание и перебор списка строк
scala> val animals = List("dog", "cat", "mouse")
animals: List[java.lang.String] = List(dog, cat, mouse)

scala> animals foreach println
dog
cat
mouse

scala> for (elem <- animals) println(elem)
dog
cat
mouse

scala>

Решение упражнения 3

Выполните пример SparkPi с помощью команды ./run, в которой указано приложение и параметры host/slices. Эта задача решена в листинге 4.

Листинг 4. Локальное выполнение теста SparkPi
$ ./run spark.examples.SparkPi local
12/01/23 20:55:33 INFO spark.CacheTrackerActor: Registered actor on port 7077
12/01/23 20:55:33 INFO spark.MapOutputTrackerActor: Registered actor on port 7077
12/01/23 20:55:33 INFO spark.SparkContext: Starting job...
12/01/23 20:55:33 INFO spark.CacheTracker: Registering RDD ID 0 with cache
12/01/23 20:55:33 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
12/01/23 20:55:33 INFO spark.CacheTrackerActor: Asked for current cache locations
12/01/23 20:55:33 INFO spark.LocalScheduler: Final stage: Stage 0
12/01/23 20:55:33 INFO spark.LocalScheduler: Parents of final stage: List()
12/01/23 20:55:33 INFO spark.LocalScheduler: Missing parents: List()
12/01/23 20:55:33 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents
12/01/23 20:55:33 INFO spark.LocalScheduler: Running task 0
12/01/23 20:55:33 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes
12/01/23 20:55:34 INFO spark.LocalScheduler: Finished task 0
12/01/23 20:55:34 INFO spark.LocalScheduler: Running task 1
12/01/23 20:55:34 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
12/01/23 20:55:34 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes
12/01/23 20:55:34 INFO spark.LocalScheduler: Finished task 1
12/01/23 20:55:34 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
12/01/23 20:55:34 INFO spark.SparkContext: Job finished in 0.3042134 s
Pi is roughly 3.13768
$

Решение упражнения 4

Пример SparkPi с разным количеством потоков легко выполнить, указав аргумент local (host). Заданное число — это количество потоков, с которым выполняется программа. Конечно, она будет работать по-разному в зависимости от количества аппаратных потоков в системе. Решение, приведенное в листинге 5, иллюстрирует выполнение с одним и двумя потоками.

Как видно, для первого прогона с одним потоком потребовались 0,59 с, в то время как второй прогон с двумя потоками закончился за 0,09 с. В вашем случае скорости могут быть другими.

Листинг 5. Выполнение программы SparkPi с разным количеством потоков
$ ./run spark.examples.SparkPi local[1]
12/01/24 18:50:41 INFO spark.CacheTrackerActor: Registered actor on port 7077
12/01/24 18:50:41 INFO spark.MapOutputTrackerActor: Registered actor on port 7077
12/01/24 18:50:41 INFO spark.SparkContext: Starting job...
12/01/24 18:50:41 INFO spark.CacheTracker: Registering RDD ID 0 with cache
12/01/24 18:50:41 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
12/01/24 18:50:41 INFO spark.CacheTrackerActor: Asked for current cache locations
12/01/24 18:50:41 INFO spark.LocalScheduler: Final stage: Stage 0
12/01/24 18:50:41 INFO spark.LocalScheduler: Parents of final stage: List()
12/01/24 18:50:41 INFO spark.LocalScheduler: Missing parents: List()
12/01/24 18:50:41 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents
12/01/24 18:50:41 INFO spark.LocalScheduler: Running task 0
12/01/24 18:50:41 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes
12/01/24 18:50:42 INFO spark.LocalScheduler: Finished task 0
12/01/24 18:50:42 INFO spark.LocalScheduler: Running task 1
12/01/24 18:50:42 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes
12/01/24 18:50:42 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
12/01/24 18:50:42 INFO spark.LocalScheduler: Finished task 1
12/01/24 18:50:42 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
12/01/24 18:50:42 INFO spark.SparkContext: Job finished in 0.595091783 s
Pi is roughly 3.12736
$ ./run spark.examples.SparkPi local[2]
12/01/24 18:50:46 INFO spark.MapOutputTrackerActor: Registered actor on port 7077
12/01/24 18:50:46 INFO spark.CacheTrackerActor: Registered actor on port 7077
12/01/24 18:50:46 INFO spark.SparkContext: Starting job...
12/01/24 18:50:46 INFO spark.CacheTracker: Registering RDD ID 0 with cache
12/01/24 18:50:46 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
12/01/24 18:50:46 INFO spark.CacheTrackerActor: Asked for current cache locations
12/01/24 18:50:46 INFO spark.LocalScheduler: Final stage: Stage 0
12/01/24 18:50:46 INFO spark.LocalScheduler: Parents of final stage: List()
12/01/24 18:50:46 INFO spark.LocalScheduler: Missing parents: List()
12/01/24 18:50:46 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents
12/01/24 18:50:46 INFO spark.LocalScheduler: Running task 0
12/01/24 18:50:46 INFO spark.LocalScheduler: Running task 1
12/01/24 18:50:46 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes
12/01/24 18:50:46 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes
12/01/24 18:50:46 INFO spark.LocalScheduler: Finished task 1
12/01/24 18:50:46 INFO spark.LocalScheduler: Finished task 0
12/01/24 18:50:46 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
12/01/24 18:50:46 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
12/01/24 18:50:46 INFO spark.SparkContext: Job finished in 0.098092002 s
Pi is roughly 3.14388
$

Обратите внимание, что вместо local можно было бы указать мастер Mesos, который поддерживает не несколько потоков на одном узле, а кластер узлов (более высокопроизводительный вариант).

Чтобы выяснить, как управлять аппаратными потоками (виртуальными ЦП), выполните следующую команду:

$ grep processor /proc/cpuinfo

Решение упражнения 5

Хотя ключевые элементы среды определяет файл конфигурации Spark (./conf/spark-env.sh), существует параметр (SPARK_MEM), который управляет количеством памяти, выделяемой каждому узлу виртуальной машины Java™. Учитывая, что Spark ориентирован на работу с наборами данных в оперативной памяти, увеличение объема памяти приведет к повышению производительности (в зависимости от нагрузки).

Как указано в разделе FAQ по Spark, некоторые наборы данных могут не помещаться в память целиком. В этом случае Spark либо повторит вычисление с разделом, который не поместился (по умолчанию), либо, если это настроено, кэширует раздел на диск. Для переноса раздела на диск вместо повторного вычисления используется параметр spark.DiskSpillingCache.

Ресурсы

Научиться

Получить продукты и технологии

Комментарии

developerWorks: Войти

Обязательные поля отмечены звездочкой (*).


Нужен IBM ID?
Забыли Ваш IBM ID?


Забыли Ваш пароль?
Изменить пароль

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Профиль создается, когда вы первый раз заходите в developerWorks. Информация в вашем профиле (имя, страна / регион, название компании) отображается для всех пользователей и будет сопровождать любой опубликованный вами контент пока вы специально не укажите скрыть название вашей компании. Вы можете обновить ваш IBM аккаунт в любое время.

Вся введенная информация защищена.

Выберите имя, которое будет отображаться на экране



При первом входе в developerWorks для Вас будет создан профиль и Вам нужно будет выбрать Отображаемое имя. Оно будет выводиться рядом с контентом, опубликованным Вами в developerWorks.

Отображаемое имя должно иметь длину от 3 символов до 31 символа. Ваше Имя в системе должно быть уникальным. В качестве имени по соображениям приватности нельзя использовать контактный e-mail.

Обязательные поля отмечены звездочкой (*).

(Отображаемое имя должно иметь длину от 3 символов до 31 символа.)

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Вся введенная информация защищена.


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Linux, Open source
ArticleID=848507
ArticleTitle=Анализ данных с помощью Spark и его производительность
publish-date=12042012