MapReduce 的简单解释?

与我的 CouchDB问题有关。

有人能用一个傻瓜能理解的术语来解释 MapReduce 吗?

76136 次浏览
  1. 取一些数据
  2. 执行某种转换,将每个数据转换为另一种数据
  3. 将这些新数据合并成更简单的数据

第二步是映射,第三步是减少。

比如说,

  1. 在路上的一对压力计上测量两个脉冲之间的时间
  2. 根据米的距离将这些时间映射为速度
  3. 把速度降低到平均速度

MapReduce 之所以分为 Map 和 Reduce,是因为不同的部分可以很容易地并行完成。(特别是当 Reduce 具有某些数学性质时。)

有关 MapReduce 的复杂但良好的描述,请参见: Google 的 MapReduce 编程模型(PDF)

让我们以 谷歌论文为例。MapReduce 的目标是能够有效地使用并行处理某种算法的处理单元负载。示例如下: 您希望提取一组文档中的所有单词及其计数。

典型的实现方式:

for each document
for each word in the document
get the counter associated to the word for the document
increment that counter
end for
end for

MapReduce 的实现:

Map phase (input: document key, document)
for each word in the document
emit an event with the word as the key and the value "1"
end for


Reduce phase (input: key (a word), an iterator going through the emitted values)
for each value in the iterator
sum up the value in a counter
end for

围绕这一点,您将拥有一个主程序,它将以“拆分”的方式对文档集进行分区,在 Map 阶段将并行处理这些文档。发出的值由辅助程序在特定于辅助程序的缓冲区中写入。然后,一旦通知缓冲区已准备好可以处理,主程序就会委托其他工作人员执行 Reduce 阶段。

每个 worker 输出(一个 Map 或者 Reduce worker)实际上是一个存储在分散式档案系统(谷歌的 gFS)或者 CouchDB 的分布式数据库中的文件。

MapReduce 是一种并行处理大量数据的方法,不需要开发人员编写除映射器和 reduce 函数之外的任何代码。

地图函数接收数据并生成一个结果,该结果保存在一个屏障中。这个函数可以与大量相同的 地图任务并行运行。然后,数据集可以是 减少到一个标量值。

所以如果你把它想象成一个 SQL 语句

SELECT SUM(salary)
FROM employees
WHERE salary > 1000
GROUP by deptname

我们可以使用 地图得到我们的薪水大于1000的员工子集 它们映射到屏障,进入群体大小的桶中。

Reduce 将对这些组进行求和,给出结果集。

这是我从谷歌论文的 大学学习笔记中挑选出来的

一直深入到 Map 和 Reduce 的基础知识。


Map 是一个函数,它将某种列表中的项目“转换”为另一种列表中的项目,并将它们放回同一种列表中。

假设我有一个数字列表: [1,2,3] ,并且我想把每个数字加倍,在这个例子中,函数“ double every number”就是函数 x = x * 2。如果没有映射,我可以写一个简单的循环

A = [1, 2, 3]
foreach (item in A) A[item] = A[item] * 2

我得到的是 A = [2,4,6]但不是写循环如果我有一个 map 函数,我可以写

A = [1, 2, 3].Map(x => x * 2)

X = > x * 2是要针对[1,2,3]中的元素执行的函数。程序接受每个条目,对它执行(x = > x * 2) ,使 x 等于每个条目,然后生成一个结果列表。

1 : 1 => 1 * 2 : 2
2 : 2 => 2 * 2 : 4
3 : 3 => 3 * 2 : 6

因此,在使用(x = > x * 2)执行 map 函数之后,您将得到[2,4,6]。


Reduce 是一个函数,它“收集”列表中的项,并对其中的 所有执行一些计算,从而将它们减少到一个值。

求和或求平均值都是 reduce 函数的实例。比如说,如果你有一个数字列表,比如[7,8,9] ,你想要它们相加,你可以写一个像这样的循环

A = [7, 8, 9]
sum = 0
foreach (item in A) sum = sum + A[item]

但是,如果您有访问 reduce 函数的权限,您可以这样编写它

A = [7, 8, 9]
sum = A.reduce( 0, (x, y) => x + y )

现在我们有点困惑为什么要传递两个参数(0和带 x 和 y 的函数)。为了使 reduce 函数有用,它必须能够接受2个条目,计算一些东西,然后将这2个条目“减少”到只有一个值,因此程序可以减少每一对,直到我们有一个单一的值。

执行程序如下:

result = 0
7 : result = result + 7 = 0 + 7 = 7
8 : result = result + 8 = 7 + 8 = 15
9 : result = result + 9 = 15 + 9 = 24

但是您不希望总是从零开始,所以第一个参数是让您指定一个种子值,特别是在第一个 result =行中的值。

如果你想求两个列表的和,它可能是这样的:

A = [7, 8, 9]
B = [1, 2, 3]
sum = 0
sum = A.reduce( sum, (x, y) => x + y )
sum = B.reduce( sum, (x, y) => x + y )

或者一个你在现实世界中更容易找到的版本:

A = [7, 8, 9]
B = [1, 2, 3]


sum_func = (x, y) => x + y
sum = A.reduce( B.reduce( 0, sum_func ), sum_func )

在数据库软件中这是一件好事,因为有了 Map Reduce 支持,您可以使用数据库而无需知道数据是如何存储在数据库中以便使用的,这就是数据库引擎的用途。

你只需要通过提供一个 Map 或者 Reduce 函数来“告诉”引擎你想要什么,然后数据库引擎就可以找到绕过数据的方法,应用你的函数,得到你想要的结果,而不需要你知道它是如何循环遍历所有记录的。

有索引、键、连接和视图以及单个数据库可以容纳的大量内容,因此通过屏蔽数据实际存储方式,可以使代码更容易编写和维护。

对于并行编程也是一样,如果你只是指定你想用数据做什么,而不是实际实现循环代码,那么底层的基础设施可以“并行化”,并在同时并行循环中为你执行你的函数。

MAP 和 REDUCE 是人类消灭最后一批恐龙时期的旧 Lisp 函数。

想象一下,你有一个城市列表,其中包括城市名称、居住人口数量和城市规模等信息:

(defparameter *cities*
'((a :people 100000 :size 200)
(b :people 200000 :size 300)
(c :people 150000 :size 210)))

现在你可能想找到人口密度最高的城市。

首先,我们使用 MAP 创建一个城市名称和人口密度列表:

(map 'list
(lambda (city)
(list (first city)
(/ (getf (rest city) :people)
(getf (rest city) :size))))
*cities*)


=>   ((A 500) (B 2000/3) (C 5000/7))

使用 REDUCE 我们现在可以找到人口密度最大的城市。

(reduce (lambda (a b)
(if (> (second a) (second b))
a
b))
'((A 500) (B 2000/3) (C 5000/7)))


=>   (C 5000/7)

结合这两部分,我们得到以下代码:

(reduce (lambda (a b)
(if (> (second a) (second b))
a
b))
(map 'list
(lambda (city)
(list (first city)
(/ (getf (rest city) :people)
(getf (rest city) :size))))
*cities*))

让我们介绍一下函数:

(defun density (city)
(list (first city)
(/ (getf (rest city) :people)
(getf (rest city) :size))))


(defun max-density (a b)
(if (> (second a) (second b))
a
b))

然后我们可以把我们的 MAP REDUCE 代码写成:

(reduce 'max-density
(map 'list 'density *cities*))


=>   (C 5000/7)

它调用 MAPREDUCE(计算由内而外) ,因此它被称为 地图缩小

我不想听起来老套,但这对我帮助很大,而且很简单:

cat input | map | reduce > output

关于 MapReduce 的 放松快点“傻瓜”介绍可以从以下网址获得: http://www.marcolotz.com/?p=67

发布一些内容:

首先,为什么最初创建 MapReduce?

基本上,谷歌需要一个解决方案,使大型计算工作易于并行化,允许数据分布在通过网络连接的多台机器上。除此之外,它还必须以透明的方式处理机器故障并管理负载平衡问题。

MapReduce 的真正优势是什么?

有人可能会说,MapReduce 魔法是基于 Map 和 Reduce 函数应用程序。我必须承认,伙计,我强烈反对。MapReduce 之所以如此流行,主要是因为它具有自动并行化和分发的能力,并结合了简单的界面。这些因素加上对大多数错误的透明失败处理使得这个框架如此流行。

文章再深入一点:

MapReduce 最初在 Google 的一篇论文中被提到(Dean & Ghemawat,2004-link here) ,作为使用并行方法和商品计算机集群在大数据中进行计算的解决方案。与用 Java 编写的 Hadoop 不同,Google 的框架是用 C + + 编写的。该文档描述了并行框架如何使用函数式编程中的 Map 和 Reduce 函数在大型数据集上运行。

在这个解决方案中,有两个主要步骤—— Map 和 Reduce ——在第一个步骤和第二个步骤(称为组合)之间有一个可选步骤。Map 步骤将首先运行,在输入键-值对中执行计算并生成一个新的输出键-值。必须记住,输入键-值对的格式不一定需要与输出格式对匹配。Reduce 步骤将汇集相同键的所有值,并在其上执行其他计算。因此,最后一步将输出键-值对。MapReduce 最简单的应用程序之一就是实现单词计数。

该应用程序的伪代码如下:

map(String key, String value):


// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, “1”);


reduce(String key, Iterator values):


// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));

可以注意到,map 读取记录中的所有单词(在这种情况下,记录可以是一行) ,并将单词作为键发出,将数字1作为值发出。 稍后,reduce 将对同一键的所有值进行分组。让我们举个例子: 假设“ house”这个词在记录中出现了三次。减速器的输入是[ house,[1,1,1]]。在约简程序中,它将对 key house 的所有值求和,并给出以下键值作为输出: [ house,[3]]。

下面是 MapReduce 框架中的效果图:

Image from the Original MapReduce Google paper

正如其他一些 MapReduce 应用程序的经典例子一样,我们可以说:

‧网址接达次数

‧反向网页连结图表

•分布式 Grep

•每台主机的术语向量

为了避免过多的网络流量,本文描述了该框架应该如何尽量保持数据的局部性。这意味着它应该始终尝试确保运行 Map 作业的计算机的内存/本地存储中有数据,避免从网络获取数据。为了减少映射器的输入量,采用了前面描述的可选合并步骤。在将映射器的输出发送给 Reducers 之前,Combiner 对给定机器中映射器的输出执行计算-这可能在另一台机器中。

文档还描述了框架的元素在出现错误时应该如何工作。在论文中,这些元素被称为工人和主人。在开源实现中,它们将被划分为更具体的元素。 由于谷歌只是在论文中描述了这种方法,并没有发布它的专有软件,许多开源框架都是为了实现这种模型而创建的。例如,可以说是 Hadoop 或 MongoDB 中有限的 MapReduce 特性。

运行时应该关注非专业程序员的细节,比如对输入数据进行分区,在大型机器集上调度程序执行,处理机器故障(当然是以透明的方式) ,以及管理机器间的通信。有经验的用户可以调优这些参数,因为输入数据将如何在工作者之间进行分区。

主要概念:

容错性:它必须能够优雅地容忍机器故障。为了执行此操作,主服务器将定期 ping 工作线程。如果主人在一定时间内没有收到给定工人的响应,主人将定义该工人的工作失败。在这种情况下,所有由错误工作者完成的 map 任务都被丢弃,并交给另一个可用工作者。如果 worker 仍在处理 map 或 reduce 任务,也会发生类似的情况。注意,如果工人已经完成了它的 reduce 部分,那么所有的计算在它失败时已经完成,不需要重新设置。作为主要故障点,如果主服务器失败,则所有作业都会失败。出于这个原因,可以为主服务器定义周期性检查点,以保存其数据结构。在最后一个检查点和主故障之间发生的所有计算都将丢失。

地点:为了避免网络流量,该框架试图确保所有输入数据都可以在本地提供给将要对它们执行计算的机器。在原始描述中,它使用 Google File System (GFS) ,复制因子设置为3,块大小为64 MB。这意味着同一块64MB (在文件系统中组成一个文件)将在三台不同的机器上有相同的副本。主机知道块在哪里,并尝试在该机器中安排映射作业。如果失败,主机会尝试在任务输入数据的副本附近分配一台机器(例如,在数据机的同一机架上的一台工作机器)。

任务粒度:假设每个贴图阶段被划分为 M 个阶段,而每个 Reduce 阶段又被划分为 R 个阶段,理想的情况是 M 和 R 比工人机器的数量大得多。这是因为执行许多不同任务的工作人员改进了动态负载平衡。除此之外,它还提高了在工作失败的情况下的恢复速度(因为它完成的许多映射任务可以分散到所有其他计算机上)。

备份任务:有时,Map 或 Reduce 工作者的行为可能比集群中的其他工作者慢得多。这样可以保持总的加工时间,使其等于那台慢速机器的加工时间。最初的文章描述了一种称为备份任务的替代方案,它是在 MapReduce 操作接近完成时由主计划的。这些任务是由正在进行的任务的主计划的。因此,MapReduce 操作在主要或备份完成时完成。

柜台:有时人们可能希望数一数事件发生的次数。出于这个原因,计算创建的位置。每个工作线程中的计数器值定期传播到主服务器。主人然后聚集(是的。看起来似乎是从这个地方来的)一个成功的 map 和 reduce 任务的计数器值,并在 MapReduce 操作完成后返回给用户代码。在主状态中还有一个当前计数器值,所以观察过程的人可以跟踪它的行为。

好吧,我想以上所有的概念,Hadoop 对你来说都是小菜一碟。如果您对 MapReduce 的原始文章或其他相关内容有任何疑问,请告诉我。

如果你熟悉 Python,以下是 MapReduce 最简单的解释:

In [2]: data = [1, 2, 3, 4, 5, 6]
In [3]: mapped_result = map(lambda x: x*2, data)


In [4]: mapped_result
Out[4]: [2, 4, 6, 8, 10, 12]


In [10]: final_result = reduce(lambda x, y: x+y, mapped_result)


In [11]: final_result
Out[11]: 42

看看每个原始数据片段是如何单独处理的,在本例中,乘以2(MapReduce 的 地图部分)。基于 mapped_result,我们得出的结论是 42(MapReduce 的 减少部分)。

从这个例子中得出的一个重要结论是,每个处理块并不依赖于另一个块。例如,如果 thread_1映射 [1, 2, 3],而 thread_2映射 [4, 5, 6],两个线程的最终结果仍然是 [2, 4, 6, 8, 10, 12],但是我们有 减半的处理时间。对于 reduce 操作也可以这样说,这也是 MapReduce 在并行计算中的工作原理。