利用 Spark 进行数据分析和性能改进

Spark 是 Hadoop 的一种有趣的替代产品,它重点关注内存数据处理。这篇练习课程探索了 Scala、Spark 及其可调优参数的多线程和多节点性能。

M. Tim Jones, 独立作家, 顾问

M.Tim JonesM. Tim Jones 是一名嵌入式固件架构师,也是 Artificial Intelligence:A Systems Approach、GNU/Linux Application Programming(现已发行第二版)、AI Application Programming(第二版)和 BSD Sockets Programming from a Multilanguage Perspective 等书籍的作者。他的工程背景非常广泛,从对地同步宇宙飞船的内核开发到嵌入式系统架构设计与网络协议开发。Tim 居住在科罗拉多州 Longmont,他是 Intel 的一名平台架构师,也是一名作家。



2012 年 3 月 19 日

Spark 是一种前景无限的大数据分析解决方案,专为使用内存处理的高效集群计算而开发。其目标使用模型包括整合了迭代式算法的模型(也就是说,能够受益于将数据保留在内存之中,而非将其推送到杨恩较高的文件系统的模型)。在动手尝试这些练习之前,请务必理解 Spark 的集群计算方法及其与 Hadoop 的不同之处。请阅读最近发表的一篇相关文章 Spark,一种快速数据分析替代方案,以便了解 Spark 的背景知识和使用方法。

概述

这些练习能为您提供以下领域中的实践经验:

  • 安装和试用 Scala 语言
  • 了解 Scala 集合
  • 安装 Spark 并运行您的第一个作业
  • 通过多线程提高性能
  • 通过配置提高性能

先决条件

这一组练习要求您具备一定的 Linux® 基础知识,包括安装新应用程序的能力。您最好具备 Scala 语言方面的知识,但这并非必要条件。您必须依次完成这些练习,因为它们演示了必要软件包的安装。


练习 1:安装和试用 Scala 语言

首先安装 Scala 语言。安装 Scala 的过程根据您使用的平台的不同而有所不同。在最麻烦的情况下,您可以下载源文件树,执行生成和安装。由于 Spark 需要最新版本的 Scala,比软件包管理器提供的版本更新,因此请通过 源文件树 安装。

安装完成后,启动 Scala 解释器(已在 参考资料 部分中列出的相关文章 “Spark,一种快速数据分析替代方案” 中进行了说明),尝试一些示例(从清单 1 到清单 3),用它们来验证您的结果。


练习 2:了解 Scala 集合

Scala 的一种有趣的特性就是集合库。Scala 中的集合 是一种包含零个或多个列表、集或映射等内容的容器。这种概念与 Spark 密切相关,因为其分布式数据集可像本地集合一样运作。您可以在 Scala 2.8 集合 API 中进一步了解 Scala 集合。详细阅读这份参考资料,了解如何创建数组和列表集合。

执行以下步骤:

  1. 创建一个 int 数组,为其应用 reverse 方法来反转其内容。
  2. 创建一个字符串列表,并通过迭代分别打印出每个字符串。

练习 3:安装 Spark 并运行您的第一个作业

下载最新版本的 Spark。获取 Spark 的最简单的方法就是利用 git

 $ git clone git://github.com/mesos/spark.git

这条命令行将在该子目录中得到一个新的子目录 ./spark.cd。现在,使用简单的生成工具 (sbt) 更新和编译发布版:

 $ sbt/sbt update compile

这将下载几个软件包,此外还会编译一些 Scala 源。为了完成配置,请进入 ./spark/conf 子目录,将 spark-env.sh-template 重命名为 spark-env.sh,并添加以下代码行:

 export SCALA_HOME=/opt/scala-2.9.1.final

请不要忘记将 SCALA_HOME/bin 添加到您的路径之中。

安装 Spark 之后,在本地主机上使用一个线程运行 SparkPi 示例程序。使用相关文章作为指南完成本任务。(请参见 参考资料 部分中列出的文章 “Spark,一种快速数据分析替代方法”。)


练习 4:通过多线程提高性能

这个练习将探索多线程和 Spark 的不同之处。利用 SparkPi 示例程序,您就可以更改与一次特定执行相关的线程数量。

以参考文章作为指南,尝试在本地上下文中使用 threads 参数,并注明执行时间的差异。


练习 5:通过配置提高性能

Spark 支持多种能够实现更高性能的配置元素。可以考虑 Spark 与 Mesos 的集群化配置:

  • 考虑到 Spark 的内存处理特性,哪些配置项(请参见 ./conf)有助于提高性能?
  • 同样(请参见 参考资料 部分中的 Spark 常见问题链接),哪些系统属性能够提高数据集缓存的性能?

练习解答

根据您的 Scala 和 Spark 版本的不同,某些输出结果可能有所不同。

练习 1 的解答

执行 Scala 安装,随后尝试部分示例(如 清单 1 所示)。通过 参考资料 部分中列出的相关文章 “Spark,一种快速数据分析替代方案”,您可以看到如何通过 Scala 的发布版安装 Scala 并使之可供访问。您可以增加将结果导出到环境的步骤,以便持久保留结果。

清单 1. 安装和试用 Scala
 $ wget http://www.scala-lang.org/downloads/distrib/files/scala-2.9.1.final.tgz
$ sudo tar xvfz scala-2.9.1.final.tgz --directory /opt
$ export SCALA_HOME=/opt/scala-2.9.1.final
$ export PATH=$SCALA_HOME/bin:$PATH
$ echo $PATH
/opt/scala-2.9.1.final/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
$ 
$ scala
Welcome to Scala version 2.9.1.final (OpenJDK Client VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> def square(x: Int) = x*x
square: (x: Int)Int

scala> square(3)
res0: Int = 9

scala> square(res0)
res1: Int = 81

scala> :quit
$

练习 2 的解答

对于这些示例,请使用 Scala 解释器来验证您的结果。清单 2 提供了数组练习的解答。

清单 2. 创建并反转一个数组
 scala> val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
numbers: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> numbers.reverse
res1: Array[Int] = Array(9, 8, 7, 6, 5, 4, 3, 2, 1)

scala>

清单 3 提供了列表练习的解答。

清单 3. 创建并迭代一个字符串列表
 scala> val animals = List("dog", "cat", "mouse")
animals: List[java.lang.String] = List(dog, cat, mouse)

scala> animals foreach println
dog
cat
mouse

scala> for (elem <- animals) println(elem)
dog
cat
mouse

scala>

练习 3 的解答

您可通过 ./run 命令执行 SparkPi 测试,指定应用程序和主机/切片。清单 4 执行了这一任务。

清单 4. 在本地执行 SparkPi 测试
$ ./run spark.examples.SparkPi local
12/01/23 20:55:33 INFO spark.CacheTrackerActor: Registered actor on port 7077
12/01/23 20:55:33 INFO spark.MapOutputTrackerActor: Registered actor on port 7077
12/01/23 20:55:33 INFO spark.SparkContext: Starting job...
12/01/23 20:55:33 INFO spark.CacheTracker: Registering RDD ID 0 with cache
12/01/23 20:55:33 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
12/01/23 20:55:33 INFO spark.CacheTrackerActor: Asked for current cache locations
12/01/23 20:55:33 INFO spark.LocalScheduler: Final stage: Stage 0
12/01/23 20:55:33 INFO spark.LocalScheduler: Parents of final stage: List()
12/01/23 20:55:33 INFO spark.LocalScheduler: Missing parents: List()
12/01/23 20:55:33 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents
12/01/23 20:55:33 INFO spark.LocalScheduler: Running task 0
12/01/23 20:55:33 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes
12/01/23 20:55:34 INFO spark.LocalScheduler: Finished task 0
12/01/23 20:55:34 INFO spark.LocalScheduler: Running task 1
12/01/23 20:55:34 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
12/01/23 20:55:34 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes
12/01/23 20:55:34 INFO spark.LocalScheduler: Finished task 1
12/01/23 20:55:34 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
12/01/23 20:55:34 INFO spark.SparkContext: Job finished in 0.3042134 s
Pi is roughly 3.13768
$

练习 4 的解答

使用不同数量的线程执行 SparkPi 测试程序非常简单,只需使用 local (host) 参数指定即可。所指定的数量是与该次运行相关的线程数量。当然,其运作方式根据您的系统中的硬件线程数量的不同而有所不同。清单 5 中的解答演示了一个和两个线程的执行。

如上面的清单所示,第一次使用单线程运行需要 0.59 秒,而第二次使用双线程运行仅用了 0.9 秒。您的具体速度可能有所不同。

清单 5. 使用不同数量的线程执行 SparkPi
$ ./run spark.examples.SparkPi local[1]
12/01/24 18:50:41 INFO spark.CacheTrackerActor: Registered actor on port 7077
12/01/24 18:50:41 INFO spark.MapOutputTrackerActor: Registered actor on port 7077
12/01/24 18:50:41 INFO spark.SparkContext: Starting job...
12/01/24 18:50:41 INFO spark.CacheTracker: Registering RDD ID 0 with cache
12/01/24 18:50:41 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
12/01/24 18:50:41 INFO spark.CacheTrackerActor: Asked for current cache locations
12/01/24 18:50:41 INFO spark.LocalScheduler: Final stage: Stage 0
12/01/24 18:50:41 INFO spark.LocalScheduler: Parents of final stage: List()
12/01/24 18:50:41 INFO spark.LocalScheduler: Missing parents: List()
12/01/24 18:50:41 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents
12/01/24 18:50:41 INFO spark.LocalScheduler: Running task 0
12/01/24 18:50:41 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes
12/01/24 18:50:42 INFO spark.LocalScheduler: Finished task 0
12/01/24 18:50:42 INFO spark.LocalScheduler: Running task 1
12/01/24 18:50:42 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes
12/01/24 18:50:42 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
12/01/24 18:50:42 INFO spark.LocalScheduler: Finished task 1
12/01/24 18:50:42 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
12/01/24 18:50:42 INFO spark.SparkContext: Job finished in 0.595091783 s
Pi is roughly 3.12736
$ ./run spark.examples.SparkPi local[2]
12/01/24 18:50:46 INFO spark.MapOutputTrackerActor: Registered actor on port 7077
12/01/24 18:50:46 INFO spark.CacheTrackerActor: Registered actor on port 7077
12/01/24 18:50:46 INFO spark.SparkContext: Starting job...
12/01/24 18:50:46 INFO spark.CacheTracker: Registering RDD ID 0 with cache
12/01/24 18:50:46 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
12/01/24 18:50:46 INFO spark.CacheTrackerActor: Asked for current cache locations
12/01/24 18:50:46 INFO spark.LocalScheduler: Final stage: Stage 0
12/01/24 18:50:46 INFO spark.LocalScheduler: Parents of final stage: List()
12/01/24 18:50:46 INFO spark.LocalScheduler: Missing parents: List()
12/01/24 18:50:46 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents
12/01/24 18:50:46 INFO spark.LocalScheduler: Running task 0
12/01/24 18:50:46 INFO spark.LocalScheduler: Running task 1
12/01/24 18:50:46 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes
12/01/24 18:50:46 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes
12/01/24 18:50:46 INFO spark.LocalScheduler: Finished task 1
12/01/24 18:50:46 INFO spark.LocalScheduler: Finished task 0
12/01/24 18:50:46 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
12/01/24 18:50:46 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
12/01/24 18:50:46 INFO spark.SparkContext: Job finished in 0.098092002 s
Pi is roughly 3.14388
$

请注意,除了 local 之外,您还可以提供一个 Mesos 主机作为连接目标,这支持节点集群,而非单一节点上的多线程(也是更高性能的选项)。

为了确定您可以利用的硬件线程(虚拟 CPU)有多少,请运行以下命令行:

 $ grep processor /proc/cpuinfo

练习 5 的解答

尽管 Spark 配置文件 (./conf/spark-env.sh) 定义了环境的关键要素,但有一个选项 (SPARK_MEM) 指定了支持为各节点的 Java™ 虚拟机使用多少内存。考虑到 Spark 关注内存数据集,因此内存越大,性能就会越高(依赖于工作负载)。

如 Spark 常见问题中所定义的那样,某些数据集可能并不适合完全置于内存之中。如果出现这种情况,Spark 将重新计算不适合的分区(默认),如果另行配置,也可以将不适合置于内存中的分区缓存在磁盘之中。要将分区转出到磁盘,而不是重新计算,请使用 spark.DiskSpillingCache 属性。

参考资料

学习

获得产品和技术

讨论

  • 加入 developerWorks 中文社区,developerWorks 社区是一个面向全球 IT 专业人员,可以提供博客、书签、wiki、群组、联系、共享和协作等社区功能的专业社交网络社区。

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Linux, Open source
ArticleID=806301
ArticleTitle=利用 Spark 进行数据分析和性能改进
publish-date=03192012