利用 Spark 进行数据分析和性能改进
Spark 是一种前景无限的大数据分析解决方案,专为使用内存处理的高效集群计算而开发。其目标使用模型包括整合了迭代式算法的模型(也就是说,能够受益于将数据保留在内存之中,而非将其推送到杨恩较高的文件系统的模型)。在动手尝试这些练习之前,请务必理解 Spark 的集群计算方法及其与 Hadoop 的不同之处。请阅读最近发表的一篇相关文章 Spark,一种快速数据分析替代方案,以便了解 Spark 的背景知识和使用方法。
概述
这些练习能为您提供以下领域中的实践经验:
- 安装和试用 Scala 语言
- 了解 Scala 集合
- 安装 Spark 并运行您的第一个作业
- 通过多线程提高性能
- 通过配置提高性能
先决条件
这一组练习要求您具备一定的 Linux® 基础知识,包括安装新应用程序的能力。您最好具备 Scala 语言方面的知识,但这并非必要条件。您必须依次完成这些练习,因为它们演示了必要软件包的安装。
练习 1:安装和试用 Scala 语言
首先安装 Scala 语言。安装 Scala 的过程根据您使用的平台的不同而有所不同。在最麻烦的情况下,您可以下载源文件树,执行生成和安装。由于 Spark 需要最新版本的 Scala,比软件包管理器提供的版本更新,因此请通过 源文件树 安装。
安装完成后,启动 Scala 解释器(已在 参考资料 部分中列出的相关文章 “Spark,一种快速数据分析替代方案” 中进行了说明),尝试一些示例(从清单 1 到清单 3),用它们来验证您的结果。
练习 2:了解 Scala 集合
Scala 的一种有趣的特性就是集合库。Scala 中的集合 是一种包含零个或多个列表、集或映射等内容的容器。这种概念与 Spark 密切相关,因为其分布式数据集可像本地集合一样运作。您可以在 Scala 2.8 集合 API 中进一步了解 Scala 集合。详细阅读这份参考资料,了解如何创建数组和列表集合。
执行以下步骤:
- 创建一个
int
数组,为其应用reverse
方法来反转其内容。 - 创建一个字符串列表,并通过迭代分别打印出每个字符串。
练习 3:安装 Spark 并运行您的第一个作业
下载最新版本的 Spark。获取 Spark 的最简单的方法就是利用 git
:
$ git clone git://github.com/mesos/spark.git
这条命令行将在该子目录中得到一个新的子目录 ./spark.cd
。现在,使用简单的生成工具 (sbt
) 更新和编译发布版:
$ sbt/sbt update compile
这将下载几个软件包,此外还会编译一些 Scala 源。为了完成配置,请进入 ./spark/conf 子目录,将 spark-env.sh-template 重命名为 spark-env.sh,并添加以下代码行:
export SCALA_HOME=/opt/scala-2.9.1.final
请不要忘记将 SCALA_HOME/bin 添加到您的路径之中。
安装 Spark 之后,在本地主机上使用一个线程运行 SparkPi 示例程序。使用相关文章作为指南完成本任务。(请参见 参考资料 部分中列出的文章 “Spark,一种快速数据分析替代方法”。)
练习 4:通过多线程提高性能
这个练习将探索多线程和 Spark 的不同之处。利用 SparkPi 示例程序,您就可以更改与一次特定执行相关的线程数量。
以参考文章作为指南,尝试在本地上下文中使用 threads
参数,并注明执行时间的差异。
练习 5:通过配置提高性能
Spark 支持多种能够实现更高性能的配置元素。可以考虑 Spark 与 Mesos 的集群化配置:
- 考虑到 Spark 的内存处理特性,哪些配置项(请参见 ./conf)有助于提高性能?
- 同样(请参见 参考资料 部分中的 Spark 常见问题链接),哪些系统属性能够提高数据集缓存的性能?
练习解答
根据您的 Scala 和 Spark 版本的不同,某些输出结果可能有所不同。
练习 1 的解答
执行 Scala 安装,随后尝试部分示例(如 清单 1 所示)。通过 参考资料 部分中列出的相关文章 “Spark,一种快速数据分析替代方案”,您可以看到如何通过 Scala 的发布版安装 Scala 并使之可供访问。您可以增加将结果导出到环境的步骤,以便持久保留结果。
清单 1. 安装和试用 Scala
$ wget http://www.scala-lang.org/downloads/distrib/files/scala-2.9.1.final.tgz $ sudo tar xvfz scala-2.9.1.final.tgz --directory /opt $ export SCALA_HOME=/opt/scala-2.9.1.final $ export PATH=$SCALA_HOME/bin:$PATH $ echo $PATH /opt/scala-2.9.1.final/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin $ $ scala Welcome to Scala version 2.9.1.final (OpenJDK Client VM, Java 1.6.0_20). Type in expressions to have them evaluated. Type :help for more information. scala> def square(x: Int) = x*x square: (x: Int)Int scala> square(3) res0: Int = 9 scala> square(res0) res1: Int = 81 scala> :quit $
练习 2 的解答
对于这些示例,请使用 Scala 解释器来验证您的结果。清单 2 提供了数组练习的解答。
清单 2. 创建并反转一个数组
scala> val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) numbers: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) scala> numbers.reverse res1: Array[Int] = Array(9, 8, 7, 6, 5, 4, 3, 2, 1) scala>
清单 3 提供了列表练习的解答。
清单 3. 创建并迭代一个字符串列表
scala> val animals = List("dog", "cat", "mouse") animals: List[java.lang.String] = List(dog, cat, mouse) scala> animals foreach println dog cat mouse scala> for (elem <- animals) println(elem) dog cat mouse scala>
练习 3 的解答
您可通过 ./run
命令执行 SparkPi 测试,指定应用程序和主机/切片。清单 4 执行了这一任务。
清单 4. 在本地执行 SparkPi 测试
$ ./run spark.examples.SparkPi local 12/01/23 20:55:33 INFO spark.CacheTrackerActor: Registered actor on port 7077 12/01/23 20:55:33 INFO spark.MapOutputTrackerActor: Registered actor on port 7077 12/01/23 20:55:33 INFO spark.SparkContext: Starting job... 12/01/23 20:55:33 INFO spark.CacheTracker: Registering RDD ID 0 with cache 12/01/23 20:55:33 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions 12/01/23 20:55:33 INFO spark.CacheTrackerActor: Asked for current cache locations 12/01/23 20:55:33 INFO spark.LocalScheduler: Final stage: Stage 0 12/01/23 20:55:33 INFO spark.LocalScheduler: Parents of final stage: List() 12/01/23 20:55:33 INFO spark.LocalScheduler: Missing parents: List() 12/01/23 20:55:33 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents 12/01/23 20:55:33 INFO spark.LocalScheduler: Running task 0 12/01/23 20:55:33 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes 12/01/23 20:55:34 INFO spark.LocalScheduler: Finished task 0 12/01/23 20:55:34 INFO spark.LocalScheduler: Running task 1 12/01/23 20:55:34 INFO spark.LocalScheduler: Completed ResultTask(0, 0) 12/01/23 20:55:34 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes 12/01/23 20:55:34 INFO spark.LocalScheduler: Finished task 1 12/01/23 20:55:34 INFO spark.LocalScheduler: Completed ResultTask(0, 1) 12/01/23 20:55:34 INFO spark.SparkContext: Job finished in 0.3042134 s Pi is roughly 3.13768 $
练习 4 的解答
使用不同数量的线程执行 SparkPi 测试程序非常简单,只需使用 local
(host) 参数指定即可。所指定的数量是与该次运行相关的线程数量。当然,其运作方式根据您的系统中的硬件线程数量的不同而有所不同。清单 5 中的解答演示了一个和两个线程的执行。
如上面的清单所示,第一次使用单线程运行需要 0.59 秒,而第二次使用双线程运行仅用了 0.9 秒。您的具体速度可能有所不同。
清单 5. 使用不同数量的线程执行 SparkPi
$ ./run spark.examples.SparkPi local[1] 12/01/24 18:50:41 INFO spark.CacheTrackerActor: Registered actor on port 7077 12/01/24 18:50:41 INFO spark.MapOutputTrackerActor: Registered actor on port 7077 12/01/24 18:50:41 INFO spark.SparkContext: Starting job... 12/01/24 18:50:41 INFO spark.CacheTracker: Registering RDD ID 0 with cache 12/01/24 18:50:41 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions 12/01/24 18:50:41 INFO spark.CacheTrackerActor: Asked for current cache locations 12/01/24 18:50:41 INFO spark.LocalScheduler: Final stage: Stage 0 12/01/24 18:50:41 INFO spark.LocalScheduler: Parents of final stage: List() 12/01/24 18:50:41 INFO spark.LocalScheduler: Missing parents: List() 12/01/24 18:50:41 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents 12/01/24 18:50:41 INFO spark.LocalScheduler: Running task 0 12/01/24 18:50:41 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes 12/01/24 18:50:42 INFO spark.LocalScheduler: Finished task 0 12/01/24 18:50:42 INFO spark.LocalScheduler: Running task 1 12/01/24 18:50:42 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes 12/01/24 18:50:42 INFO spark.LocalScheduler: Completed ResultTask(0, 0) 12/01/24 18:50:42 INFO spark.LocalScheduler: Finished task 1 12/01/24 18:50:42 INFO spark.LocalScheduler: Completed ResultTask(0, 1) 12/01/24 18:50:42 INFO spark.SparkContext: Job finished in 0.595091783 s Pi is roughly 3.12736 $ ./run spark.examples.SparkPi local[2] 12/01/24 18:50:46 INFO spark.MapOutputTrackerActor: Registered actor on port 7077 12/01/24 18:50:46 INFO spark.CacheTrackerActor: Registered actor on port 7077 12/01/24 18:50:46 INFO spark.SparkContext: Starting job... 12/01/24 18:50:46 INFO spark.CacheTracker: Registering RDD ID 0 with cache 12/01/24 18:50:46 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions 12/01/24 18:50:46 INFO spark.CacheTrackerActor: Asked for current cache locations 12/01/24 18:50:46 INFO spark.LocalScheduler: Final stage: Stage 0 12/01/24 18:50:46 INFO spark.LocalScheduler: Parents of final stage: List() 12/01/24 18:50:46 INFO spark.LocalScheduler: Missing parents: List() 12/01/24 18:50:46 INFO spark.LocalScheduler: Submitting Stage 0, has no missing parents 12/01/24 18:50:46 INFO spark.LocalScheduler: Running task 0 12/01/24 18:50:46 INFO spark.LocalScheduler: Running task 1 12/01/24 18:50:46 INFO spark.LocalScheduler: Size of task 0 is 1481 bytes 12/01/24 18:50:46 INFO spark.LocalScheduler: Size of task 1 is 1481 bytes 12/01/24 18:50:46 INFO spark.LocalScheduler: Finished task 1 12/01/24 18:50:46 INFO spark.LocalScheduler: Finished task 0 12/01/24 18:50:46 INFO spark.LocalScheduler: Completed ResultTask(0, 0) 12/01/24 18:50:46 INFO spark.LocalScheduler: Completed ResultTask(0, 1) 12/01/24 18:50:46 INFO spark.SparkContext: Job finished in 0.098092002 s Pi is roughly 3.14388 $
请注意,除了 local
之外,您还可以提供一个 Mesos 主机作为连接目标,这支持节点集群,而非单一节点上的多线程(也是更高性能的选项)。
为了确定您可以利用的硬件线程(虚拟 CPU)有多少,请运行以下命令行:
$ grep processor /proc/cpuinfo
练习 5 的解答
尽管 Spark 配置文件 (./conf/spark-env.sh) 定义了环境的关键要素,但有一个选项 (SPARK_MEM
) 指定了支持为各节点的 Java™ 虚拟机使用多少内存。考虑到 Spark 关注内存数据集,因此内存越大,性能就会越高(依赖于工作负载)。
如 Spark 常见问题中所定义的那样,某些数据集可能并不适合完全置于内存之中。如果出现这种情况,Spark 将重新计算不适合的分区(默认),如果另行配置,也可以将不适合置于内存中的分区缓存在磁盘之中。要将分区转出到磁盘,而不是重新计算,请使用 spark.DiskSpillingCache
属性。
相关主题
- 在 Scala 编程语言 项目网站中进一步了解这种语言。
- 在 Spark 集群计算框架 项目网站中进一步了解这种框架。
- 阅读作者的另外一篇文章 Spark,一种快速数据分析替代方案,了解 Spark 的背景知识和使用方法。(developerWorks,2011 年 11 月)。
- 详细阅读 Scala 2.8 集合 API,获得有关 Scala 集合以及如何创建数组和列表集合的信息。(Martin Odersky 与 Lex Spoon,2010 年 9 月)。
- 访问 Spark 常见问题,获得更多相关信息。
- 通过 源文件树 安装最新版本的 Scala
- 访问 developerWorks 中国网站开源技术专区,获得有关开放源码工具和开放源码技术应用的丰富信息。
- 按照最适合您的方式 IBM 产品评估试用版软件:下载产品试用版、在线试用产品、在云环境中使用产品,或者抽出几个小时的时间通过 IBM SOA 人员沙箱 了解如何有效地实现面向服务架构。
- 在 developerWorks Linux 专区 寻找为 Linux 开发人员(包括 Linux 新手入门)准备的更多参考资料,查阅我们 最受欢迎的文章和教程。
- 在 developerWorks 上查阅所有 Linux 技巧 和 Linux 教程。
- 随时关注 developerWorks 技术活动和网络广播。