MapReduce 排序算法是如何工作的?

用于演示 MapReduce 强大功能的主要示例之一是 Terasort benchmark。我很难理解 MapReduce 环境中使用的排序算法的基本原理。

对我来说,排序只是确定一个元素与所有其他元素之间的相对位置。因此,排序涉及到将“一切”与“一切”进行比较。你的平均排序算法(快速、泡沫、 ... ...)只是用一种聪明的方式做到了这一点。

In my mind splitting the dataset into many pieces means you can sort a single piece and then you still have to integrate these pieces into the 'complete' fully sorted dataset. Given the terabyte dataset distributed over thousands of systems I expect this to be a huge task.

那么,这到底是如何实现的呢? MapReduce 排序算法是如何工作的呢?

Thanks for helping me understand.

67084 次浏览

谷歌参考资料: MapReduce: 大型集群上的简化数据处理

出现在 :
OSDI’04: 第六届操作系统设计与实现研讨会,
2004年12月,加利福尼亚州旧金山。

该链接有一个 PDF 和 HTML-Slide 参考。

还有一个带有实现引用的 附有描述的维基百科页面

还有批评,

David DeWitt and Michael Stonebraker, pioneering experts in parallel databases and shared nothing architectures, have made some controversial assertions about the breadth of problems that MapReduce can be used for. They called its interface too low-level, and questioned whether it really represents the paradigm shift its proponents have claimed it is. They challenge the MapReduce proponents' claims of novelty, citing Teradata as an example of prior art that has existed for over two decades; they compared MapReduce programmers to Codasyl programmers, noting both are "writing in a low-level language performing low-level record manipulation". MapReduce's use of input files and lack of schema support prevents the performance improvements enabled by common database system features such as B-trees and hash partitioning, though projects such as PigLatin and Sawzall are starting to address these problems.

Just guessing...

给定一个巨大的数据集,您将把数据分割成一些块,以便并行处理(可能是通过记录编号,即记录1-1000 = 分区1,等等)。

将每个分区分配/调度到集群中的特定节点。

每个集群节点将进一步将分区分割(映射)成自己的迷你分区,可能是通过键字母顺序。因此,在分区1中,获取所有以 A 开头的内容并将其输出到小分区 A 的 x 中。用序列号替换 x (这可能是调度程序的工作)。也就是说,给我下一个 A (x)唯一的 ID。

将映射器(前一步)完成的作业移交(调度)到“ reduce”集群节点。然后,Reduce 节点集群将进一步细化每个 A (x)部分的排序,这些部分只有在完成所有映射器任务时才会发生(当仍然有可能出现另一个 A 迷你分区时,不能实际上开始对所有从 w/A 开始的单词进行排序)。在最终排序的分区中输出结果(例如,Sorted-A,Sorted-B,等等)

完成后,再次将排序的分区合并到单个数据集中。此时,它只是 n 个文件的简单串联(如果您只处理 A-Z,那么 n 可以是26) ,等等。

中间可能有一些中间步骤... ... 我不确定:)。即在初始的还原步骤之后进一步映射和还原。

以下是有关 Terasort 的 Hadoop 实现的一些详情:

TeraSort is a standard map/reduce sort, except for a custom partitioner that uses a sorted list of N − 1 sampled keys that define the key range for each reduce. In particular, all keys such that sample[i − 1] <= key < sample[i] are sent to reduce i. This guarantees that the output of reduce i are all less than the output of reduce i+1."

所以他们的诀窍在于在地图阶段确定密钥的方式。从本质上说,它们确保了单个约简器中的每个值都可以针对所有其他约简器进行“预排序”。

我通过 James Hamilton’s Blog Post 詹姆斯 · 汉密尔顿的博客文章找到了论文的参考文献。

我在阅读谷歌的 MapReduce 论文时也有同样的问题。 @ Yuval F 回答几乎解决了我的难题。

在阅读这篇文章时,我注意到了一件事,那就是神奇的事情发生在分区中(在 map 之后,在 reduce 之前)。

The paper uses hash(key) mod R as the partitioning example, but this is not the only way to partition intermediate data to different reduce tasks.

只需在 @ Yuval F 回答中添加边界条件即可完成: 假设 min (S)和 max (S)是取样键中的最小键和最大键; 所有键 < min (S)被分区到一个 reduce 任务; 反之亦然,所有键 > = max (S)被分区到一个 reduce 任务。

没有硬限制的取样键,如最小或最大。只是,这些 R 键在所有键之间分布得更均匀,这个分布式系统更“并行”,reduce 操作符出现内存溢出问题的可能性也更小。