内容


用 Hadoop 进行分布式数据处理,第 3 部分

应用程序开发

为 Hadoop 开发 Ruby MapReduce 应用程序

Comments

系列内容:

此内容是该系列 # 部分中的第 # 部分: 用 Hadoop 进行分布式数据处理,第 3 部分

敬请期待该系列的后续内容。

此内容是该系列的一部分:用 Hadoop 进行分布式数据处理,第 3 部分

敬请期待该系列的后续内容。

此系列的前两篇文章 专注于单节点和多节点集群的 Hadoop 安装及配置。最后这篇文章探索了 Hadoop 编程 — 特别是在 Ruby 语言中 map 和 reduce 应用程序开发。我之所以选择 Ruby,首先是因为,它是一个您应该知道的很棒的面向对象的脚本语言,其次,您将在 参考资料 部分发现很多参考,其中包括解决 Java™ 和 Python 语言的教程。通过这种 MapReduce 编程的探索,将向您介绍流式应用程序编程接口(Application Programming Interface,API)。此 API 提供方法以便在 Java 语言以外的多种语言中开发应用程序。

让我们开始简要介绍一下 map 和 reduce(从功能的角度考虑),然后再进一步钻研 Hadoop 编程模型及其体系结构和用来雕刻、分配、管理工作的元素。

map 和 reduce 的起源

是什么功能性元素激发了 MapReduce 编程范例的创立?在 1958 年,John McCarthy 发明了名为 Lisp 的语言,其实现了数值和符号计算,但在递归形式下此语言非常不同于现在所使用的大多数语言。(在维基百科全书上记述着 Lisp 那段迷人的历史,同时包括一个有用的教程 — 值得您花费时间来阅读。)Lisp 最先是在 IBM® 704 中实现的,IBM® 704 是第一种大规模生产的计算机,也支持其他旧的语言,如 FORTRAN。

map 函数,源于功能性语言(如 Lisp)但如今在其他语言中也很常见,其中包含了一系列元素的函数的应用程序。这意味着什么? 清单 1 通过 Scheme Shell (SCSH) 提供解释会话,即一个 Lisp 衍生。第一行定义一个名为 square 的函数,该函数可接受参数并发出其平方根。下一行说明 map 函数的使用。如图所示,通过 map,为已应用的函数提供您的函数和一系列元素。结果是一个包含平方元素的新列表。

清单 1. SCSH 上的 map 函数演示
> (define square (lambda (x) (* x x)))
> (map square '(1 3 5 7))
'(1 9 25 49)
>

Reduce 也适用于列表但是通常将列表缩减为标量值。清单 2中提供的示例说明用于将列表缩减为标量的其他 SCSH 函数 — 在这种情况下,用 (1 + (2 + (3 + (4 + (5))))) 的格式汇总值的列表。请注意这是典型的功能性编程,取决于迭代上的递归。

清单 2. SCSH 上的 reduce 演示
> (define (list-sum lis) (if (null? lis) 0 (+ (car lis) (list-sum (cdr lis)))))
> (list-sum '(1 2 3 4 5))
15
>

有趣的是要注意递归与迭代在命令性语言中同样高效,因为递归在幕后被转化成迭代。

Hadoop 的编程模型

Google 引用 MapReduce 的概念作为处理或生成大型数据集的编程模型。在规范模型中,map 函数处理键值对,这将得出键值对的中间集。然后 reduce 函数会处理这些中间键值对,并合并相关键的值(请参考图 1)。输入数据使用这样一种方法进行分区,即在并行处理的计算机集群中分区的方法。使用相同的方法,已生成的中间数据将被并行处理,这是处理大量数据的理想方法。

图 1. MapReduce 处理的简化视图
MapReduce 处理的简化视图
MapReduce 处理的简化视图

对于快速刷新器来说,查看图 1 的体系结构,从 map 和 reduce 角度来进行字数统计(因为您将在本文中开发 map 和 reduce 应用程序)。在提供输入数据时(进入 Hadoop 文件系统 [HDFS]),首先分段,然后分配给 map 工作线程(通过作业跟踪器)。虽然 图 2 中的示例显示了一个被分段的简短语句,但是分段的工作数量通常在 128MB 范围内,其原因是建立工作只需要很少的时间,因为有更多的工作要做,以便最大限度地减少这种开销。map 工作线程(在规范的示例中)将工作分割成包含已标记单词和初始值(在此情况下是 1)的单个矢量。在 map 任务完成时(如通过任务跟踪器在 Hadoop 中所定义的),提供工作给 reduce 工作线程。通过代表所发现的键的数量的值,reduce 工作线程将许多键缩减为一个惟一的集合。

图 2. 简单的 MapReduce 示例
简单的 MapReduce 示例
简单的 MapReduce 示例

请注意此过程可在相同的或不同的计算机中出现或者使用不同的数据分区来按顺序或并行完成,且结果仍然是相同的。

虽然规范的视图(用于使用字数统计生成搜索索引)是一种用来查看 Hadoop 方法,但结果是此计算模型被常规地应用到可计算问题上,正如您将要看到的那样。

Hadoop 的灵活性

图 2 中所示的简单示例看,需注意 mapreduce 过程这两个主要元素。虽然这里存在一个这些过程如何工作的传统视图,但是它不是 mapreduce 体系结构所需要的。这就是 Hadoop 的真实力量 — 其灵活性用来实现在某种程度上活动的 mapreduce 过程,这解决了一个特定的应用程序。虽然字数统计示例对于大量的问题是有用且适用的,但是其他的模型仍然在此总体框架内适用。所需的就是使 map 和 reduce 应用程序的开发过程对于 Hadoop 可见。

在其他的应用程序中,Hadoop 已经被用于实现包括神经网络算法的计算机学习应用程序,支持矢量计算机以及 k-means 集群(要获得更多信息,请参考 参考资料 部分)。

数据流

虽然 Hadoop 是一个基于 Java 的框架,但是其有可能在 Java 语言以外的语言中编写 msp 和 reduce 应用程序。Hadoop 内的 实用工具实现了一种数据流胶的类型。通过 实用工具,您可以定义您自己的可执行 map 和 reduce(使用每一个从标准输入 [stdin] 提取的输入和每一个通过标准输出 [stdout] 提供的输出),且 实用工具可适当地读取和写入数据,根据需要调用您的应用程序(请参考清单 3)。

清单 3. 使用 Hadoop 流实用工具
hadoop jar $HADOOP_HOME/hadoop-流.jar \
	-input inputData
	-output outputData
	-mapper map_exec
	-reducer reduce_exec

清单 3 说明如何在 Hadoop 内使用 实用工具,图 3 图形化地显示了如何定义流。请注意这是一个流使用的简单示例。大量的选项可用于制定如何解析数据、制定如何调用图像、为分区器和合成器指定替换图像以及调整其他配置(要获得更多信息,请参考 参考资料 部分)。

图 3. 图形流示例
图形流示例

Ruby 示例

通过已经获得的在 实用工具基本理解上的经验,您已经准备编写一个简单的 Ruby map 和 reduce 应用程序并查看如何在 Hadoop 框架中使用过程。虽然此处的示例伴随着规范的 MapReduce 应用程序,但是稍后您将看到其他的应用程序(取决于您将如何用 map 和 reduce 格式实现它们)。

首选是 mapper。此脚本从 stdin 提取文本输入,首先标记它,然后将一系列键值对发送到 stdout。像大多数面向对象的脚本语言一样,这个任务几乎太简单了。如清单 4 中所示的 mapper 脚本(通过一些注释和空白区域可给与其大一点的大小)。此程序使用一个迭代器来从 stdin 中读取一行,同时另一个迭代器将该行分割成单个的标记。使用为 1 的相关值(通过选项卡分隔)将每一个标记(单词)发送到 stdout。

清单 4. Ruby map 脚本(map.rb)
#!/usr/bin/env ruby

# Our input comes from STDIN
STDIN.each_line do |line|

  # Iterate over the line, splitting the words from the line and emitting
  # as the word with a count of 1.
  line.split.each do |word|
    puts "#{word}\t1"
  end

end

下一步,查看 reduce 应用程序。虽然此应用程序稍微有些复杂,但是使用 Ruby hash(关联阵列)可简化 reduce 操作(请参考清单 5)。此脚本可通过来自 stdin (通过 实用工具传递)的输入数据再次工作且将该行分割成一个单词或值。而后该 hash 会检查该单词;如果发现,则将计数添加到元素。否则,您需要在该单词的 hash 中创建新的条目,然后加载计数(应该是来自 mapper 过程的 1)。在所有输入都被处理以后,通过 hash 可简单迭代且将键值对发送到 stdout。

清单 5. Ruby reduce 脚本(reduce.rb)
#!/usr/bin/env ruby

# Create an empty word hash
wordhash = {}

# Our input comes from STDIN, operating on each line
STDIN.each_line do |line|

  # Each line will represent a word and count
  word, count = line.strip.split

  # If we have the word in the hash, add the count to it, otherwise
  # create a new one.
  if wordhash.has_key?(word)
    wordhash[word] += count.to_i
  else
    wordhash[word] = count.to_i
  end

end

# Iterate through and emit the word counters
wordhash.each {|record, count| puts "#{record}\t#{count}"}

随着 map 和 reduce 脚本的完成,需从命令行测试它们。记得要使用 chmod +x 将这些文件更改为可执行。通过生成输入文件来启动,如清单 6 所示。

清单 6. 生成输入文件
# echo "Hadoop is an implementation of the map reduce framework for " \
	"distributed processing of large data sets." > input
#

通过单词输入,现在您可以测试您的 mapper 脚本,如清单 7 所示。回想此脚本简单地将输入标记到键值对,此处每个值都将是 1(非惟一输入)。

清单 7. 测试 mapper 脚本
# cat input | ruby map.rb
Hadoop	1
is	1
an	1
implementation	1
of	1
the	1
map	1
reduce	1
framework	1
for	1
distributed	1
processing	1
of	1
large	1
data	1
sets.	1
#

到目前为止,一切都很顺利。现在,在原始流格式中将整个应用程序一起调出。在清单 8 中,通过 map 脚本传递您的输入、排序输出(可选步骤)、然后通过 reduce 脚本传递由此产生的中间数据。

清单 8. 使用 Linux 管道的简单 MapReduce
# cat input | ruby map.rb | sort | ruby reduce.rb
large	1
of	2
framework	1
distributed	1
data	1
an	1
the	1
reduce	1
map	1
sets.	1
Hadoop	1
implementation	1
for	1
processing	1
is	1
#

使用 Hadoop 的 Ruby

在 shell 环境中您的 map 和 reduce 脚本按预期工作,通过 Hadoop 将它们放入测试中。我将会跳过 Hadoop 安装任务(参考本系列的 用 Hadoop 进行分布式数据处理,第 1 部分:入门用 Hadoop 进行分布式数据处理,第 2 部分:进阶 以便建立 Hadoop 并使其运行)。

第一步将要在 HDFS 内为您的输入信息创建输入字典,然后提供一个将测试您脚本的简单文件。清单 9 说明了此步骤(有关这些步骤的更多信息,请参考本系列的 用 Hadoop 进行分布式数据处理,第 1 部分:入门用 Hadoop 进行分布式数据处理,第 2 部分:进阶)。

清单 9. 为 MapReduce 过程创建输入数据
# hadoop fs -mkdir input
# hadoop dfs -put /usr/src/linux-source-2.6.27/Documentation/memory-barriers.txt input
# hadoop fs -ls input
Found 1 items
-rw-r--r--  1 root supergroup  78031 2010-06-04 17:36 /user/root/input/memory-barriers.txt
#

下一步,使用 实用工具,通过自定义脚本来调用 Hadoop,简化输出的输入数据和位置(请参考清单 10)。在此示例中请注意 -file 选项会简单地告诉 Hadoop 来打包您的 Ruby 脚本作为部分作业提交。

清单 10. 通过 Ruby MapReduce 脚本使用 Hadoop 流
# hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-0.20.2+228-streaming.jar \
  -file /home/mtj/ruby/map.rb -mapper /home/mtj/ruby/map.rb \
  -file /home/mtj/ruby/reduce.rb -reducer /home/mtj/ruby/reduce.rb \
  -input input/* -output output
packageJobJar: [/home/mtj/ruby/map.rb, /home/mtj/ruby/reduce.rb, /var/lib/hadoop-0.20/...
10/06/04 17:42:38 INFO mapred.FileInputFormat: Total input paths to process : 1
10/06/04 17:42:39 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/...
10/06/04 17:42:39 INFO streaming.StreamJob: Running job: job_201006041053_0001
10/06/04 17:42:39 INFO streaming.StreamJob: To kill this job, run:
10/06/04 17:42:39 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job ...
10/06/04 17:42:39 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/...
10/06/04 17:42:40 INFO streaming.StreamJob:  map 0%  reduce 0%
10/06/04 17:43:17 INFO streaming.StreamJob:  map 100%  reduce 0%
10/06/04 17:43:26 INFO streaming.StreamJob:  map 100%  reduce 100%
10/06/04 17:43:29 INFO streaming.StreamJob: Job complete: job_201006041053_0001
10/06/04 17:43:29 INFO streaming.StreamJob: Output: output
#

最后,通过 hadoop 实用工具使用 cat 文件系统操作来探索输出(请参考清单 11)。

Listing 11. Exploring the Hadoop output
# hadoop fs -ls /user/root/output
Found 2 items
drwxr-xr-x  - root supergroup      0 2010-06-04 17:42 /user/root/output/_logs
-rw-r--r--  1 root supergroup  23014 2010-06-04 17:43 /user/root/output/part-00000
# hadoop fs -cat /user/root/output/part-00000 | head -12
+--->|	4
immediate	2
Alpha)	1
enable	1
_mandatory_	1
Systems	1
DMA.	2
AMD64	1
{*C,*D},	2
certainly	2
back	2
this	23
#

在不到 30 行的脚本中,您已经在 Hadoop 框架内实现了 mapreduce 元素并演示了它们的执行。虽然是一个简单的示例,但是通过自定义的和专有的算法说明了 Hadoop 背后真实的力量以及为什么 Hadoop 正在成为一种用于处理大型数据集的流行框架。

Hadoop 的其他应用程序

Hadoop 可用于许多应用程序上,其已超越了为大型数据集简单计算字数的工作。所有这一切的需要就是用矢量格式表达 Hadoop 基础设施可以使用的数据。虽然规范的示例使用矢量表达作为键和值,但是并没有限制您如何来定义值(例如一些值的汇总)。在更加丰富的应用程序集中此灵活性可以为 Hadoop 创造新的机会。

一个一直适合 MapReduce 字数统计模型的有趣的应用程序正在把 Web 服务器访问的频率制表(在开创性 Google 文章中讨论)。对于此应用程序来说,URL 作为键来服务(从 Web 服务器访问日志摄取)。reduce 过程的结果是基于 Web 服务器日志的给定 Web 站点的每次 URL 访问的总数。

在计算机学习用户程序中,Hadoop 已经作为处理大量 GA 个体的规模遗传算法的一种方法(潜在解决方案)。map 过程执行传统的遗传算法,从本地池中搜索最佳单个解决方案。然后 reduce 应用程序成为来自 map 阶段的单个解决方案的集成。这会允许单个节点识别最佳解决方案,然后允许这些解决方案在最适于生存的分布式显示的 reduce 阶段中相互竞争。

另外一个有趣的应用程序被创建用于识别僵尸网络的垃圾邮件。此过程的第一步将会为减少垃圾邮件为目的而对电子邮件按来自给定组织而进行分类(基于一组指纹)。根据过滤的这些数据,对以特定方式(例如参考电子邮件正文中的相同链接)连接的邮件生成一个图表。然后这些相关电子邮件会减少至主机(静态或动态 IP 地址)以识别有问题的僵尸网络。

在应用程序之外通过 map 和 reduce 基元来查看世界,Hadoop 作为在计算机集群之间分配工作的方式非常有用。 Map 和 reduce 并非必须强制某种特定类型的应用程序。相反地,Hadoop 可以被视为一种可以同时将数据和算法分配到主机以获得更快速的并行处理速度的方法。

Hadoop 应用程序生态系统

虽然 Hadoop 提供了一个灵活的架构,但也可以使用其他应用程序转换与其他应用程序的界面。一个有趣的示例称为 Hive,它是一个具有自己特定查询语言(称为 Hive QL)的数据仓库基础结构。Hive 使得 Hadoop 更加熟悉结构化查询语言 (SQL) 背景,同时还支持传统的 MapReduce 基础结构来进行数据处理。

HBase 是另外一种位于 HDFS 顶部的有趣的应用程序。它是一个类似于 Google BigTable 的高性能数据库系统。代替传统的文件处理,HBase 使数据库将 MapReduce 处理的输入和输出格式列表。

最后,Pig 是 Hadoop 中用于分析大型数据集的平台。Pig 提供可编译 map 和 reduce 应用程序的高级语言。

进一步的学习

这是 Hadoop 系列 的最后一篇文章,探索了在适用于 Hadoop 框架的 Ruby 中开发 map 和 reduce 应用程序。希望从这篇文章您可以看到 Hadoop 的真正力量。虽然 Hadoop 将您限制在一个特定的编程模型中,但是这种模型是灵活的且可被应用到大量的应用程序上。


相关主题


评论

添加或订阅评论,请先登录注册

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Linux, Java technology, Open source
ArticleID=505628
ArticleTitle=用 Hadoop 进行分布式数据处理,第 3 部分: 应用程序开发
publish-date=08092010