refer: MapReduce: Simplified Data Processing on Large Clusters
摘要
MapReduce 是一种编程模型及其相关实现,用于处理和生成大型数据集。用户定义一个映射(map)函数,该函数处理键/值对以生成一组中间键/值对,以及一个归约(reduce)函数,用于合并所有与相同中间键关联的值。许多现实中的任务都可以在该模型中表达,正如本文所展示的那样。
用这种函数式风格编写的程序会自动并行化,并在由许多普通机器组成的大型集群上运行。运行时系统负责分配输入数据、在多台机器上调度程序执行、处理机器故障以及管理机器间的必要通信。这样,程序员即便没有并行和分布式系统的经验,也可以轻松地利用大规模分布式系统的资源。
我们实现的 MapReduce 在由普通机器组成的大型集群上运行,并且具有极高的可扩展性:一个典型的 MapReduce 计算会在数千台机器上处理多个TB的数据。程序员们发现该系统易于使用:已实现了数百个 MapReduce 程序,每天 Google 集群上执行超过一千个 MapReduce 任务。
1. 引言
在过去的五年中,作者及 Google 的许多人实现了数百种专门用途的计算,这些计算处理大量原始数据(如抓取的文档、网页请求日志等),以生成各种派生数据(例如倒排索引、网页文档的图结构表示、每台主机抓取的页面数摘要、每日最频繁的查询集合等)。大多数此类计算在概念上是简单的。然而,输入数据通常很大,计算需要分布在数百甚至数千台机器上,以便在合理的时间内完成。如何并行化计算、分配数据以及处理故障的问题导致代码变得非常复杂,以至于掩盖了原本简单的计算逻辑。
为了应对这一复杂性,我们设计了一种新的抽象,使我们可以表达简单的计算,同时隐藏并行化、容错、数据分布和负载均衡的繁琐细节。我们的抽象灵感来自 Lisp 和其他许多函数式语言中的 map 和reduce 原语。我们意识到,我们的大多数计算都涉及对输入中的每个逻辑“记录”应用一个 map 操作,以计算一组中间键/值对,然后对具有相同键的所有值应用 reduce 操作,以适当合并派生数据。我们采用用户指定的 map 和 reduce 操作的函数式模型,使我们能够轻松并行化大型计算,并将重新执行作为主要的容错机制。
这项工作的主要贡献在于一个简单且功能强大的接口,使大规模计算的自动并行化和分布成为可能,同时实现了在大规模普通 PC 集群上高性能的接口实现。第2节描述了基本的编程模型,并给出了几个示例。第 3 节介绍了针对集群计算环境的 MapReduce 接口实现。第 4 节描述了一些我们发现有用的编程模型改进。第 5 节包含了我们实现对多种任务的性能测量。第 6 节探讨了 MapReduce 在 Google 内部的应用,包括我们将其用于生产索引系统重写的经验。第 7 节讨论了相关的未来的工作。
2. 编程模型
该计算模型接收一组输入键/值对,并生成一组输出键/值对。MapReduce 库的用户通过两个函数来表达计算过程:Map
和 Reduce
。
Map
函数由用户编写,接收一个输入对并生成一组中间键/值对。MapReduce 库会将所有与相同中间键关联的中间值分组,并将其传递给Reduce
函数。Reduce
函数也由用户编写,接收一个中间键和该键的值集合。它会将这些值合并成一个较小的值集合,通常每次Reduce
调用会生成零个或一个输出值。中间值通过迭代器提供给用户的Reduce
函数,从而可以处理过大而无法放入内存的值列表。
2.1 示例
考虑在大量文档集合中统计每个单词出现次数的问题。用户可以编写类似于以下伪代码的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
map(String key, String value):
// key: 文档名称
// value: 文档内容
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: 一个单词
// values: 计数字符串列表
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
在此示例中,map
函数发出每个单词及其出现次数(仅为“1”)。reduce
函数将发出的所有计数相加并产生每个特定单词的总数。
此外,用户编写的代码需要填充 mapreduce
规范对象,包括输入和输出文件的名称,以及可选的调优参数。然后用户调用 MapReduce
函数并传递此规范对象。用户的代码与 MapReduce 库(用 C++ 实现)链接在一起。附录A中包含此示例的完整程序代码。
2.2 数据类型
尽管前面的伪代码是以字符串输入和输出表示的,用户定义的 map
和 reduce
函数实际上具有以下关联类型:
1
2
map (k1,v1) → list(k2,v2)
reduce (k2,list(v2)) → list(v2)
也就是说,输入键和值来自一个与输出键和值不同的域。此外,中间键和值与输出键和值属于相同域。
我们的 C++ 实现将字符串传递给用户定义的函数,并且由用户代码负责在字符串和适当的类型之间进行转换。
2.3 更多示例**
以下是一些可以用 MapReduce 表达的简单有趣程序示例:
- 分布式Grep(模式匹配):
map
函数如果行与给定模式匹配则发出该行,reduce
函数为一个简单的“身份函数”,它将传入的中间数据复制到输出中。 - URL访问频率统计:
map
函数处理网页请求日志并输出<URL, 1>
。reduce
函数将相同URL的所有值相加并发出<URL, 总计数>
。 - 反向网页链接图:
map
函数为每个在页面中发现的链接输出<目标, 来源>
。reduce
函数将与给定目标URL关联的所有来源URL拼接为列表并发出<目标, 列表(来源)>
。 - 每台主机的词向量(Term-Vector): 词向量表示一个文档或文档集中的重要单词及其频率。
map
函数为每个文档发出一个<主机名, 词向量>
对,reduce
函数则合并每台主机的词向量。 - 倒排索引:
map
函数解析每个文档,发出<单词, 文档ID>
对。reduce
函数对每个单词的所有文档 ID 进行排序并发出<单词, 文档ID列表>
。 - 分布式排序:
map
函数提取每条记录的键并发出<键, 记录>
。reduce
函数保持对键/值对不变并发出。此计算依赖于第 4.1 节中的分区功能和第 4.2 节中的排序属性。
3. 实现
MapReduce 接口可以有多种不同的实现,具体选择取决于计算环境。例如,一种实现可能适用于小型共享内存计算机,另一种适合大型的 NUMA(非一致性内存访问)多处理器系统,还有一种适合更大规模的联网机器集合。
本节描述了针对 Google 广泛使用的计算环境的实现:由普通 PC 组成的大型集群,这些 PC 通过交换式以太网连接在一起。在我们的环境中:
- 机器通常是运行 Linux 的双处理器 x86 系统,每台机器具有 2-4 GB 的内存。
- 使用普通的网络硬件,每台机器的网络带宽通常为 100 Mbps 或 1 Gbps,但整体双向带宽平均远低于这一水平。
- 集群由成百上千台机器组成,因此机器故障是常见的。
- 存储由直接连接到各个机器的廉价 IDE 硬盘提供,并通过我们内部开发的分布式文件系统来管理,这个文件系统使用复制来提高在不可靠硬件上的可用性和可靠性。
- 用户将作业提交到调度系统中,每个作业包含一组任务,调度器将这些任务分配到集群中可用的机器上。
3.1 执行概览
Map 调用通过自动对输入数据进行分区分配到多台机器上。输入数据分区为 M
份,可以由不同的机器并行处理。通过分区函数将中间键空间划分为 R
份,从而分配 Reduce
调用。例如,使用 hash(key) mod R
分区函数。用户可以指定分区数(R
)和分区函数。
图1展示了 MapReduce 操作的整体流程。当用户程序调用 MapReduce 函数时,执行以下操作(图中各项的编号对应下方说明中的序号):
- MapReduce 库在用户程序中首先将输入文件分割成
M
个部分,每个部分通常为 16 到 64 MB(用户可以通过一个可选参数控制)。然后在集群中的多台机器上启动程序的多个副本。 - 程序的一个副本是主控(master),其余的是由主控分配任务的工作节点(worker)。主控分配
M
个Map
任务和R
个Reduce
任务。主控选择空闲的工作节点,并将一个 Map 任务或 Reduce 任务分配给每个节点。 - 被分配 Map 任务的工作节点读取对应输入分区的内容,将输入数据解析为键/值对,并将每个对传递给用户定义的Map函数。Map 函数生成的中间键/值对缓存在内存中。
- 缓存的数据对被定期写入本地磁盘,并按分区函数划分为
R
个区域。这些缓存对的存储位置被反馈给主控,主控负责将这些位置转发给 Reduce 任务的工作节点。 - 当主控通知 Reduce 节点这些位置时,Reduce 节点使用远程过程调用从 Map 节点的本地磁盘读取缓存的数据。当 Reduce 节点读取完所有中间数据后,它按中间键对数据进行排序,以便将相同键的值组合在一起。由于通常许多不同的键对应于同一个 Reduce 任务,所以排序是必要的。如果中间数据太大无法放入内存,则使用外排序。
- Reduce 节点遍历排序后的中间数据,对于每个独特的中间键,将键和值集合传递给用户定义的 Reduce 函数。Reduce 函数的输出追加到该 Reduce 分区的最终输出文件中。
- 当所有 Map 和 Reduce 任务完成后,主控唤醒用户程序,此时 MapReduce 调用返回给用户代码。
在成功完成后,MapReduce 执行的输出存储在 R
个输出文件中(每个Reduce任务一个文件,文件名由用户指定)。通常用户不需要将这些 R
个输出文件合并为一个文件,因为他们常常将这些文件作为另一个 MapReduce 调用的输入,或从能够处理多文件输入的分布式应用中使用这些文件。
3.2 主控数据结构
主控保存若干数据结构。对于每个 Map 任务和 Reduce 任务,它存储任务的状态(空闲、进行中或已完成)以及工作节点的身份(非空闲任务)。
主控是中间文件区域位置在 Map 任务和 Reduce 任务之间传播的通道。因此,对于每个已完成的 Map 任务,主控存储由该任务生成的 R
个中间文件区域的位置和大小。此位置信息在 Map 任务完成时更新,并增量推送给正在进行的 Reduce 任务。
3.3 容错性
MapReduce 库设计用于在数百甚至数千台机器上处理大量数据,因此它必须能够容忍机器故障。
- 工作节点故障:主控定期向每个工作节点发送 ping 请求。如果在指定时间内未收到响应,则将该节点标记为故障节点。该节点完成的所有 Map 任务都会重置为初始空闲状态,从而可以在其他节点上重新调度。同样,在故障节点上进行中的 Map 任务或 Reduce 任务也会重置为空闲状态,并变得可以重新调度。由于故障节点的输出存储在其本地磁盘中,因此已完成的 Map 任务需要重新执行,而 Reduce 任务的输出存储在全局文件系统中,因此无需重新执行。
- 主控故障:可以让主控定期将数据结构状态写入检查点文件。如果主控进程故障,可以从最后一次检查点状态重新启动一个副本。由于只有一个主控,其故障发生概率低,因此我们当前的实现会在主控故障时中止 MapReduce 计算。用户可以选择检查这种情况并重新尝试执行 MapReduce 操作。
3.4 数据本地化
在我们的计算环境中,网络带宽是一种相对稀缺的资源。我们通过利用输入数据(由Google文件系统GFS管理)存储在集群机器本地磁盘上的特点来节省网络带宽。GFS 将每个文件分成 64 MB 的块,并在不同机器上存储多个副本(通常为 3 个副本)。MapReduce 主控在调度 Map 任务时会考虑输入文件的位置信息,尽可能将 Map 任务调度到包含对应输入数据副本的机器上。如果无法调度到包含数据副本的机器,则优先将 Map 任务调度到接近该数据副本的机器(例如位于同一个网络交换机的机器上)。在集群大规模运行 MapReduce 时,大部分输入数据是本地读取的,不消耗网络带宽。
3.5 任务粒度
我们将 Map 阶段划分为 M
份,将 Reduce 阶段划分为 R
份,如上所述。理想情况下,M
和 R
的数量应该远大于工作节点的数量。让每个工作节点执行多个不同任务有助于动态负载平衡,并且在节点故障时更快地恢复:该节点已完成的许多 Map 任务可以分配给所有其他工作节点。
在实际操作中,我们选择 M
值以使每个单独任务约为 16 MB 到 64 MB 输入数据(以使上文描述的本地化优化最为有效),并将 R
设置为我们预计使用的工作节点数量的几倍。我们经常以 M=200,000
和 R=5,000
进行 MapReduce 计算,使用 2,000 个工作节点。
3.6 备份任务
MapReduce 操作耗时增加的常见原因之一是“拖尾任务”:机器因种种原因导致最后几项 Map 或 Reduce 任务的执行时间异常长。拖尾可能由多个原因引起,例如,磁盘问题导致读性能从 30 MB/s 下降到 1 MB/s,或集群调度系统在机器上调度了其他任务,导致 MapReduce 代码因竞争 CPU、内存、本地磁盘或网络带宽而变慢。最近我们遇到的问题是初始化代码的一个错误,导致处理器缓存被禁用,计算速度因此降低了超过一百倍。
我们使用了一种通用机制来缓解拖尾任务问题。当 MapReduce 操作接近完成时,主控会为剩下的正在进行的任务安排备份执行。当主任务或备份任务中的一个完成后,该任务就被标记为完成。我们对该机制进行了调优,使其通常只增加少量的计算资源,但能显著减少完成大规模 MapReduce 操作的时间。例如,第 5.3 节描述的排序程序在禁用备份任务机制时完成时间增加了 44%。
3.7 本地化执行
调试 Map 或 Reduce 函数的问题较为棘手,因为实际的计算发生在分布式系统上,通常涉及数千台机器,并且由主控动态决定任务分配。为便于调试、性能分析和小规模测试,我们开发了一个 MapReduce 库的替代实现,它在本地机器上顺序执行 MapReduce 操作的所有任务。用户可通过特定标志调用该实现,从而在本地计算中限制某些 Map 任务的执行。这样用户可以轻松使用任何适合的调试或测试工具(例如 gdb
)。
3.8 状态信息
主控运行一个内部 HTTP 服务器,并输出一组供人工查看的状态页面。这些状态页面显示计算进度(例如已完成任务数、进行中的任务数、输入字节数、中间数据字节数、输出字节数、处理速率等)。页面还包含指向每个任务生成的标准错误和标准输出文件的链接。用户可以利用这些数据预测计算完成所需的时间,并判断是否需要增加计算资源。这些页面还可帮助用户在计算比预期慢得多时查明原因。
此外,顶级状态页面显示哪些工作节点故障,及其在故障时所执行的 Map 和 Reduce 任务。这些信息有助于诊断用户代码中的错误。
3.9 计数器
MapReduce 库提供计数器功能,用于统计各种事件的出现次数。例如,用户代码可能需要统计总词数或索引的德文文档数等。
用户代码可以通过创建命名计数器对象并在 Map 和/或 Reduce 函数中适当地增加计数来使用该功能。例如:
1
2
3
4
5
6
7
8
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
各个工作节点的计数器值会定期汇报给主控(通过 ping 响应附带传送)。主控聚合成功完成的 Map 和 Reduce 任务的计数器值,并在 MapReduce 操作完成后将其返回给用户代码。主控状态页面上还显示当前的计数器值,用户可以实时监控计算进展。聚合计数器值时,主控会排除重复执行的 Map 或 Reduce 任务的影响,以避免重复计数(重复执行可能来自备份任务机制或因故障引起的任务重新执行)。
某些计数器值由 MapReduce 库自动维护,例如已处理的输入键/值对数量和已生成的输出键/值对数量。
4. 改进
尽管仅通过编写 Map
和 Reduce
函数的基本功能已能满足大多数需求,但我们发现一些扩展功能非常有用。本节介绍这些改进。
4.1 分区函数
MapReduce 的用户可以指定所需的 Reduce 任务/输出文件数目(R
)。数据根据中间键的分区函数被划分到这些任务中。默认的分区函数使用散列(例如“hash(key) mod R
”),这通常会生成较为均衡的分区。然而,在某些情况下,以键的其他特性来分区更为合适。例如,当输出键是 URL 时,我们希望同一个主机的所有条目都位于相同的输出文件中。为此,MapReduce 库允许用户提供自定义的分区函数。比如,使用“hash(Hostname(urlkey)) mod R
”作为分区函数可确保来自同一主机的所有 URL 都在同一个输出文件中。
4.2 排序保证
我们保证在给定分区内,中间键/值对会按键的升序处理。此排序保证使得每个分区的输出文件都可以按顺序生成,这在需要支持按键值高效随机访问查找的输出文件格式时非常有用,或者当用户希望排序后的数据更方便地使用时也非常有帮助。
4.3 Combiner 函数
在某些情况下,每个 Map 任务生成的中间键会重复出现很多次,并且用户指定的 Reduce 函数是交换结合的。一个典型的例子是第 2.1 节的词频统计示例。由于单词频率通常遵循 Zipf 分布,每个 Map 任务会生成大量的 <the, 1>
记录。所有这些计数将通过网络发送到单个 Reduce 任务,然后由 Reduce 函数相加以生成总数。我们允许用户指定一个可选的 Combiner 函数,以在数据通过网络传输之前进行部分合并。
Combiner 函数在执行Map任务的每台机器上执行。通常 Combiner 和 Reduce 函数由同一段代码实现。Combiner 和 Reduce 函数的唯一区别在于 MapReduce 库对输出的处理方式:Reduce 函数的输出写入最终输出文件,而 Combiner 函数的输出写入将传送到 Reduce 任务的中间文件。
部分合并显著加速了某些类型的 MapReduce 操作。附录A中包含一个使用 Combiner 的示例。
4.4 输入和输出类型
MapReduce 库支持多种输入数据格式。例如,“文本”模式输入将每行视为一个键/值对:键是文件中的偏移量,值是该行的内容。另一种常见的格式是存储按键排序的一组键/值对。每个输入类型实现知道如何将自身划分为有意义的范围,以便作为独立的 Map 任务进行处理(例如,文本模式的范围划分确保只在行边界上进行拆分)。用户可以通过提供简单的读取接口实现,添加对新输入类型的支持,但大多数用户会使用少数几种预定义的输入类型。
读取器不一定必须提供从文件读取的数据。例如,可以很容易地定义一个读取器,它从数据库或内存映射的数据结构中读取记录。
同样,我们支持一组输出类型,用于以不同格式生成数据,用户代码也可以轻松地添加对新输出类型的支持。
4.5 副作用
在某些情况下,MapReduce 用户发现产生附加文件作为 Map 和/或 Reduce 操作的输出是非常方便的。我们依赖应用程序编写者确保此类副作用的操作是原子和幂等的。通常应用程序先写入一个临时文件,并在完全生成后原子地重命名该文件。
我们不提供单个任务生成的多个输出文件的原子两阶段提交支持。因此,生成具有跨文件一致性要求的多个输出文件的任务应为确定性任务。在实际操作中,这种限制并没有引起任何问题。
4.6 跳过错误记录
有时用户代码中的 Bug 会导致 Map 或 Reduce 函数在某些记录上崩溃。此类错误会阻止 MapReduce 操作完成。通常的解决方法是修复 Bug,但有时这不可行,比如错误出现在没有源码的第三方库中。此外,有时可以忽略少数记录,例如在对大型数据集进行统计分析时。我们提供了一个可选的执行模式,在该模式下 MapReduce 库检测出导致崩溃的记录,并跳过这些记录以继续进行。
每个工作节点安装了一个信号处理程序,以捕获段错误和总线错误。在调用用户的 Map 或 Reduce 操作之前,MapReduce 库会将参数的序列号存储在一个全局变量中。如果用户代码产生信号,信号处理程序会发送包含序列号的“最后一搏” UDP 数据包给 MapReduce 主控。当主控看到某个记录出现多次失败时,在下一次重新执行相应的 Map 或 Reduce 任务时指示跳过该记录。
4.7 本地执行
在 Map 或 Reduce 函数中调试问题可能较为棘手,因为实际的计算发生在分布式系统上,通常涉及数千台机器,主控动态决定任务分配。为了便于调试、性能分析和小规模测试,我们开发了一个 MapReduce 库的替代实现,它在本地机器上顺序执行 MapReduce 操作的所有任务。用户可以通过调用一个特殊标志来启用该实现,并轻松使用任何适合的调试或测试工具(例如 gdb
)。
4.8 状态信息
主控运行一个内部 HTTP 服务器,输出一组供人工查看的状态页面。这些状态页面显示了计算的进展情况,例如已完成任务数、进行中任务数、输入字节数、中间数据字节数、输出字节数、处理速率等。页面还包含指向每个任务生成的标准错误和标准输出文件的链接。用户可以利用这些数据预测计算完成所需时间,并判断是否需要增加计算资源。这些页面还可帮助用户在计算比预期慢得多时查明原因。
此外,顶级状态页面显示哪些工作节点失败及其失败时处理的 Map 和 Reduce 任务。此信息有助于诊断用户代码中的错误。
5. 性能
在本节中,我们对在大型集群上运行的两类计算的 MapReduce 性能进行测量。一种计算是搜索约 1 TB 的数据以查找特定模式,另一种是对约 1 TB 的数据进行排序。这两个程序代表了 MapReduce 用户实际编写程序的很大一部分:一类程序将数据从一种表示形式转换为另一种,另一类则从大数据集中提取出少量有用数据。
5.1 集群配置
所有程序在一个约 1800 台机器的集群上执行。每台机器配备了两个 2 GHz 的 Intel Xeon 处理器(支持超线程)、4 GB 内存、两个 160 GB 的 IDE 硬盘和千兆以太网连接。这些机器通过两级树形交换网络连接,根部的总带宽约为 100-200 Gbps。所有机器位于同一托管设施中,因此任意两台机器之间的往返时间小于一毫秒。
4 GB 内存中约有 1-1.5 GB 被集群中的其他任务保留。这些程序在周六下午运行,当时 CPU、硬盘和网络大多处于空闲状态。
5.2 Grep
grep
程序扫描约 1010 个 100 字节的记录,查找一个较为稀有的三字符模式(该模式在 92337 个记录中出现)。输入被分割为约 64 MB 的部分(M = 15000
),整个输出被写入一个文件(R = 1
)。
图2显示了计算随时间的进展情况。Y轴表示输入数据的扫描速率。随着越来越多的机器分配给此 MapReduce 计算,速率逐渐增加,当 1764 个工作节点被分配时达到了 30 GB/s 以上。当 Map 任务完成时,速率开始下降,并在约 80 秒时降为零。整个计算从开始到结束耗时约150秒,其中包括约 1 分钟的启动开销。这部分开销是因为需要将程序分发到所有工作节点,并与 GFS 交互以打开 1000 个输入文件并获取本地化优化所需的信息。
5.3 排序
sort
程序对约 1010 个 100 字节的记录(约 1 TB 数据)进行排序。该程序基于 TeraSort 基准测试。
排序程序包含不到 50 行用户代码。三行 Map
函数从一行文本中提取一个 10 字节的排序键,并将键和原始文本行作为中间键/值对输出。我们使用了内置的 Identity
函数作为 Reduce
操作,该函数将中间键/值对不变地输出。最终的排序结果被写入一组双向复制的 GFS 文件(即输出的数据总量为 2 TB,以确保可靠性和可用性)。
与之前相同,输入数据被分割成 64 MB 的部分(M = 15000
),我们将排序结果分区为 4000 个文件(R = 4000
)。分区函数根据键的前几个字节将键划分为 R
片。
我们使用的分区函数预先了解键的分布。对于一般的排序程序,我们会添加一个预处理的 MapReduce 操作,以收集键的样本并使用样本键的分布来计算最终排序阶段的分割点。
图3(a)显示了正常执行排序程序的进展。左上图显示了输入数据的读取速率。速率峰值约为 13 GB/s,由于所有 Map 任务在 200 秒内完成,速率很快降低。需要注意的是,此时输入速率低于 grep
程序。这是因为排序中的 Map 任务消耗了约一半时间和 I/O 带宽将中间输出写入本地磁盘,而 grep
的中间输出几乎没有大小。
左中图显示了从 Map 任务到 Reduce 任务的数据网络传输速率。这一“洗牌”过程在第一个 Map 任务完成后立即开始。图中第一个高峰是约 1700 个 Reduce 任务的首批数据传输。约 300 秒后,第一批 Reduce 任务完成,开始传输剩余 Reduce 任务的数据。所有数据传输在 600 秒内完成。
左下图显示了 Reduce 任务将排序数据写入最终输出文件的速率。第一个传输阶段结束后,与写入阶段开始之间有一个延迟,因为此时机器正在对中间数据进行排序。接下来写入的速率约为 2-4 GB/s,所有写入在约 850 秒内完成。包括启动开销,整个计算耗时 891 秒,接近 TeraSort 基准测试的最佳结果1057秒。
几点值得注意:由于本地化优化,大部分数据从本地磁盘读取,绕过了带宽受限的网络,因此输入速率高于洗牌速率和输出速率。洗牌速率高于输出速率,因为输出阶段会写入两份数据(我们为可靠性和可用性复制了输出文件)。若基础文件系统采用纠删码而非复制,写入数据的网络带宽需求将会减少。
5.4 备份任务的效果
图3(b)显示了禁用备份任务的排序程序执行情况。执行流程与图3(a)相似,但在最后阶段出现了一个非常长的尾部,几乎没有写入活动。960 秒后,除了 5 个 Reduce 任务外其他任务全部完成,然而这最后几个任务的完成耗时 300 秒。整个计算耗时 1283 秒,比正常执行时间增加了 44%。
5.5 机器故障
图3(c)显示了在排序程序执行中故意终止 1746 个工作进程中的 200 个后的情况。集群调度系统立即在这些机器上重启新的工作进程(因为只有进程被终止,机器本身仍在正常运行)。
进程终止显示为负的输入速率,因为先前已完成的部分 Map 工作失效(由于相应的Map节点被终止)并需要重新执行。重新执行的 Map 工作很快完成。整个计算用时 933 秒(仅比正常执行时间增加5%)。
6. 实践经验
我们于 2003 年 2 月编写了 MapReduce 库的第一个版本,并在 2003 年 8 月进行了重大改进,包括本地化优化、任务执行的动态负载均衡等。从那时起,我们对 MapReduce 库的广泛适用性感到惊喜。它在 Google 内部的众多领域得到应用,包括:
- 大规模的机器学习问题,
- Google新闻(Google News)和Froogle(Google购物)产品中的聚类问题,
- 用于生成热门查询报告的数据提取(例如Google Zeitgeist),
- 从大量网页中提取属性信息用于新实验和产品(例如从网页中提取地理位置信息以支持本地化搜索),
- 大规模的图计算。
图4显示了在我们的主代码管理系统中 MapReduce 程序数量的显著增长,从 2003 年初的 0 增长到 2004 年 9 月的近 900 个不同的实例。MapReduce 之所以成功,原因在于它使得编写一个简单的程序并在数千台机器上高效运行成为可能,大大加快了开发和原型制作的周期。此外,它还允许没有分布式和/或并行系统经验的程序员轻松利用大量计算资源。
在每个作业结束时,MapReduce 库会记录作业使用的计算资源统计数据。表1显示了 2004 年 8 月在 Google 运行的一些 MapReduce 作业的统计数据。
6.1 大规模索引
我们迄今为止对 MapReduce 最显著的应用之一是对生产索引系统的全面重写。该索引系统生成用于 Google 网页搜索服务的数据结构。索引系统以由爬虫系统获取的文档集合为输入,并存储为一组 GFS 文件。这些文档的原始内容超过 20 TB 。索引过程通过 5 到 10 个 MapReduce 操作顺序执行。使用 MapReduce(代替旧版索引系统中的分布式处理流程)带来了以下几个好处:
- 索引代码更简单、更小且更易于理解,因为 MapReduce 库隐藏了容错、数据分布和并行化相关的代码。例如,计算的某一阶段的代码行数从约 3800 行 C++ 代码减少到约 700 行。
- MapReduce 库的高性能使我们可以将概念上无关的计算分开,而不是将它们混合在一起以避免对数据的多次处理。这使得索引过程易于更改。例如,在旧索引系统中花费几个月才能完成的一个更改,在新系统中只用了几天。
- 索引过程的操作变得更容易,因为机器故障、慢速机器和网络问题引起的大多数问题都由 MapReduce 库自动处理,无需操作人员干预。此外,通过向索引集群添加新机器,可以轻松提高索引过程的性能。
7. 相关工作
许多系统都提供了受限的编程模型,并利用这些限制来自动实现计算的并行化。例如,在 N
个处理器上,使用前缀计算可以在 log N
时间内计算一个 N
元素数组的所有前缀的关联函数。可以将 MapReduce 看作是对这些模型的简化和提炼,基于我们在大规模实际计算中的经验。此外,我们提供了一个容错实现,能够扩展到数千个处理器上。相比之下,大多数并行处理系统仅在较小规模上实现,且将处理机器故障的细节留给程序员。
批量同步编程(Bulk Synchronous Programming)和某些 MPI(消息传递接口)原语提供了较高层次的抽象,使得编写并行程序更为容易。与这些系统的主要区别在于,MapReduce 利用受限的编程模型来自动并行化用户程序,并提供透明的容错机制。
我们的本地化优化灵感来自于类似主动磁盘的技术,该技术将计算推向接近本地磁盘的处理元素,以减少 I/O 子系统或网络中传输的数据量。我们运行在直接连接少量磁盘的普通处理器上,而非直接在磁盘控制器上运行,但总体思路相似。
我们的备份任务机制类似于 Charlotte 系统中采用的急切调度机制。简单的急切调度的一个缺点是,如果某个任务导致重复失败,整个计算可能无法完成。我们通过跳过损坏记录的机制修复了该问题的某些情况。
MapReduce 的实现依赖于内部的集群管理系统,该系统负责在大量共享机器上分发并运行用户任务。尽管这不是本文的重点,集群管理系统在理念上与 Condor 系统类似。
MapReduce 库中的排序功能在操作上与 NOW-Sort 类似。源机器(Map 工作节点)将要排序的数据分区并发送到一个 Reduce 工作节点。每个 Reduce 工作节点本地对其数据进行排序(尽量在内存中)。当然,NOW-Sort 没有我们库中用户可定义的 Map 和 Reduce 函数,使得我们的库在应用上更为广泛。
River 提供了一种编程模型,进程通过分布式队列互相传递数据。与 MapReduce 类似,River 系统旨在即便在非均匀的硬件或系统波动下也提供良好的平均性能。River 通过精心调度磁盘和网络传输以实现平衡的完成时间。MapReduce 则采取不同的方法。通过限制编程模型,MapReduce 框架能够将问题划分为大量细粒度任务,这些任务动态调度到可用的工作节点上,使得速度更快的工作节点处理更多任务。限制编程模型还使我们能够在作业结束时安排任务的冗余执行,从而在遇到非均匀情况(如缓慢或停滞的工作节点)时显著减少完成时间。
BAD-FS 的编程模型与 MapReduce 非常不同,且目标是跨广域网执行作业。然而,两者在基本方面有两个相似点:
(1) 两者都使用冗余执行来从故障中恢复数据。
(2) 两者都采用本地化调度以减少跨拥挤网络链路传输的数据量。
TACC 是一种旨在简化高可用性网络服务构建的系统。与 MapReduce 一样,TACC 使用重新执行机制来实现容错。
8. 结论
MapReduce 编程模型在 Google 的许多不同应用中获得了成功。我们将这一成功归功于以下几个原因。首先,该模型易于使用,即便是没有分布式和并行系统经验的程序员也可以使用,因为它隐藏了并行化、容错、本地化优化和负载平衡的细节。其次,各种问题都可以轻松地表达为 MapReduce 计算。例如,MapReduce 用于生成 Google 生产网页搜索服务的数据、用于排序、数据挖掘、机器学习等众多系统。第三,我们开发了一个可以扩展到包含数千台机器的大型集群的 MapReduce 实现。该实现高效利用了这些机器资源,因此适用于 Google 遇到的许多大型计算问题。
我们从这项工作中学到了几件事。首先,限制编程模型使得并行化和分布计算变得简单,同时使得这些计算具有容错能力。其次,网络带宽是一种稀缺资源,因此我们系统中的多个优化都旨在减少跨网络传输的数据量:本地化优化允许我们从本地磁盘读取数据,而将中间数据写入本地磁盘的单个副本则节省了网络带宽。第三,冗余执行可用于减少缓慢机器的影响,同时处理机器故障和数据丢失。
致谢
Josh Levenberg 在基于自己和他人的建议经验之上,增加了许多新功能,对用户层 MapReduce API 进行了大量修改和扩展。MapReduce 从 Google 文件系统(GFS)读取输入并写入输出。我们感谢Mohit Aron、Howard Gobioff、Markus Gutschke、David Kramer、Shun-Tak Leung以及Josh Redstone为开发GFS所做的工作。我们还感谢 Percy Liang 和 Olcan Sercinoglu 为开发 MapReduce 使用的集群管理系统所做的工作。Mike Burrows、Wilson Hsieh、Josh Levenberg、Sharon Perl、Rob Pike和Debby Wallach对本文早期版本提供了宝贵的意见。匿名的 OSDI 评审员和我们本论文的监督者Eric Brewer也提出了许多有用的改进建议。最后,我们感谢 Google 工程团队中 MapReduce 的所有用户,他们为 MapReduce 提供了宝贵的反馈、建议和错误报告。
参考文献
Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, David E. Culler, Joseph M. Hellerstein, and David A. Patterson. 高性能的网络工作站排序。在1997年ACM SIGMOD国际数据管理会议论文集,亚利桑那州图森,1997年5月。
Remzi H. Arpaci-Dusseau, Eric Anderson, Noah Treuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. 使用River实现集群I/O:将快速情况变为常态。在并行和分布式系统中的第六届输入/输出研讨会(IOPADS ‘99)论文集,第10-22页,美国佐治亚州亚特兰大,1999年5月。
Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte:网络元计算。在第9届国际并行与分布式计算系统会议论文集中,1996年。
Luiz A. Barroso, Jeffrey Dean, and Urs Hölzle. 行星级的Web搜索:Google集群架构。IEEE Micro,23(2):22–28,2003年4月。
John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. 批量感知分布式文件系统中的显式控制。在第1届USENIX网络系统设计和实现研讨会NSDI,2004年3月。
Guy E. Blelloch. 扫描作为并行操作的基本操作。IEEE 计算机事务,C-38(11),1989年11月。
Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. 基于集群的可扩展网络服务。在第16届ACM操作系统原则研讨会论文集,第78–91页,法国圣马洛,1997年。
Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. Google文件系统。在第19届操作系统原则研讨会论文集,第29–43页,美国纽约州乔治湖,2003年。
S. Gorlatch. 系统化高效地并行化扫描和其他列表同构。在L. Bouge、P. Fraigni- aud、A. Mignotte和Y. Robert编著的“Euro-Par’96. 并行处理”中,Lecture Notes in Computer Science 1124,第401–408页。Springer-Verlag,1996年。
Jim Gray. 排序基准测试主页。http://research.microsoft.com/barc/SortBenchmark/
William Gropp, Ewing Lusk, and Anthony Skjellum. 使用MPI:具有消息传递接口的可移植并行编程。麻省理工学院出版社,剑桥,马萨诸塞州,1999年。
L. Huston, R. Sukthankar, R. Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. 钻石:用于交互式搜索的早期丢弃存储架构。在2004年USENIX文件和存储技术(FAST)会议论文集中,2004年4月。
Richard E. Ladner and Michael J. Fischer. 并行前缀计算。ACM期刊,27(4):831–838,1980年。
Michael O. Rabin. 为安全性、负载平衡和容错的高效信息分散。ACM期刊,36(2):335–348,1989年。
Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. 用于大规模数据处理的主动磁盘。IEEE计算机,第68–74页,2001年6月。
Douglas Thain, Todd Tannenbaum, and Miron Livny. 分布式计算的实践:Condor的经验。并发与计算:实践与经验,2004年。
L. G. Valiant. 一种并行计算的桥接模型。ACM通信,33(8):103–111,1997年。
Jim Wyllie. SPSort:如何快速排序一太字节的数据。http://alme1.almaden.ibm.com/cs/spsort.pdf
附录A - 词频统计
本节包含一个统计每个独特单词在输入文件集合中出现次数的程序。文件名在命令行中指定。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#include "mapreduce/mapreduce.h"
// 用户的 Map 函数
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// 跳过前导空白符
while ((i < n) && isspace(text[i]))
i++;
// 找到单词结尾
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start, i - start), "1");
}
}
};
REGISTER_MAPPER(WordCounter);
// 用户的 Reduce 函数
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
// 遍历具有相同键的所有条目并将值相加
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
// 发出输入键的总和
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// 将输入文件列表存储到 "spec" 中
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
// 指定输出文件:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// 可选:在Map任务中做部分合并,以节省网络带宽
out->set_combiner_class("Adder");
// 调优参数:最多使用2000台机器,每个任务使用最多100 MB内存
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// 现在运行它
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// 完成:`result` 结构包含关于计数器、耗时、机器使用情况等的信息
return 0;
}