map和flatMap之间的区别是什么,以及它们各自的良好用例?

谁能给我解释一下map和flatMap之间的区别,以及它们各自的良好用例是什么?

"flatten the results"是什么意思? 它有什么好处?< / p >
229451 次浏览

下面是一个区别的例子,作为spark-shell会话:

首先是一些数据——两行文本:

val rdd = sc.parallelize(Seq("Roses are red", "Violets are blue"))  // lines


rdd.collect


res0: Array[String] = Array("Roses are red", "Violets are blue")

现在,map将一个长度为N的RDD转换为另一个长度为N的RDD。

例如,它将两行映射为两行长度:

rdd.map(_.length).collect


res1: Array[Int] = Array(13, 16)

但是flatMap(松散地说)将长度为N的RDD转换为N个集合的集合,然后将这些集合摊平为单个结果RDD。

rdd.flatMap(_.split(" ")).collect


res2: Array[String] = Array("Roses", "are", "red", "Violets", "are", "blue")

我们每行有多个单词,而且每行有多行,但我们最终得到一个单词输出数组

为了说明这一点,从一个行集合到一个单词集合的flatMapping如下:

["aa bb cc", "", "dd"] => [["aa","bb","cc"],[],["dd"]] => ["aa","bb","cc","dd"]

因此,flatMap的输入和输出rdd通常具有不同的大小。

如果我们试图将mapsplit函数一起使用,我们最终会得到嵌套结构(类型为RDD[Array[String]]的单词数组的RDD),因为每个输入必须只有一个结果:

rdd.map(_.split(" ")).collect


res3: Array[Array[String]] = Array(
Array(Roses, are, red),
Array(Violets, are, blue)
)

最后,一个有用的特殊情况是映射到一个可能不返回答案的函数,因此返回Option。我们可以使用flatMap过滤出返回None的元素,并从返回Some的元素中提取值:

val rdd = sc.parallelize(Seq(1,2,3,4))


def myfn(x: Int): Option[Int] = if (x <= 2) Some(x * 10) else None


rdd.flatMap(myfn).collect


res3: Array[Int] = Array(10,20)

(注意这里Option的行为很像一个只有一个元素或者没有元素的列表)

如果您正在询问RDD之间的区别。map和RDD。在Spark中,map将一个大小为N的RDD转换为另一个大小为N的RDD。如。

myRDD.map(x => x*2)

例如,如果myRDD由double组成。

而flatMap可以将RDD转换为另一个不同大小的RDD: 如:< / p >
myRDD.flatMap(x =>new Seq(2*x,3*x))

将返回大小为2*N的RDD 或者< / p >

myRDD.flatMap(x =>if x<10 new Seq(2*x,3*x) else new Seq(x) )

mapflatMap是类似的,在某种意义上,它们从输入RDD中取一行并在其上应用一个函数。它们的不同之处在于map中的函数只返回一个元素,而flatMap中的函数可以返回一个元素列表(0或更多)作为迭代器。

而且,flatMap的输出被平摊。虽然flatMap中的函数返回一个元素列表,但flatMap返回一个RDD,其中以一种平坦的方式(而不是列表)包含列表中的所有元素。

通常我们在hadoop中使用字数计算示例。我将采用相同的用例,并将使用mapflatMap,我们将看到它如何处理数据的差异。

下面是示例数据文件。

hadoop is fast
hive is sql on hdfs
spark is superfast
spark is awesome

上面的文件将使用mapflatMap进行解析。

使用map

>>> wc = data.map(lambda line:line.split(" "));
>>> wc.collect()
[u'hadoop is fast', u'hive is sql on hdfs', u'spark is superfast', u'spark is awesome']

输入有4行,输出大小也是4,即N个元素==> N个元素。

使用flatMap

>>> fm = data.flatMap(lambda line:line.split(" "));
>>> fm.collect()
[u'hadoop', u'is', u'fast', u'hive', u'is', u'sql', u'on', u'hdfs', u'spark', u'is', u'superfast', u'spark', u'is', u'awesome']

输出与map不同。


让我们为每个键赋值1以获得单词计数。

  • fm:使用flatMap创建的RDD
  • wc:使用map创建的RDD
>>> fm.map(lambda word : (word,1)).collect()
[(u'hadoop', 1), (u'is', 1), (u'fast', 1), (u'hive', 1), (u'is', 1), (u'sql', 1), (u'on', 1), (u'hdfs', 1), (u'spark', 1), (u'is', 1), (u'superfast', 1), (u'spark', 1), (u'is', 1), (u'awesome', 1)]

而RDD wc上的flatMap将给出以下不希望看到的输出:

>>> wc.flatMap(lambda word : (word,1)).collect()
[[u'hadoop', u'is', u'fast'], 1, [u'hive', u'is', u'sql', u'on', u'hdfs'], 1, [u'spark', u'is', u'superfast'], 1, [u'spark', u'is', u'awesome'], 1]

如果使用map而不是flatMap,则无法获得单词计数。

根据定义,mapflatMap之间的区别是:

map:它通过对每个元素应用给定函数返回一个新的RDD RDD。map中的函数只返回一个项

flatMap:类似于map,它通过应用函数返回一个新的RDD 到RDD的每个元素,但输出是平坦的

map返回相同数量元素的RDD,而flatMap可能不会。

flatMap的示例用例过滤掉丢失或不正确的数据。

map的示例用例用于各种情况,其中输入和输出的元素数量是相同的。

number.csv

1
2
3
-
4
-
5

map.py添加add.csv中的所有数字。

from operator import *


def f(row):
try:
return float(row)
except Exception:
return 0


rdd = sc.textFile('a.csv').map(f)


print(rdd.count())      # 7
print(rdd.reduce(add))  # 15.0

flatMap.py使用flatMap在添加之前过滤掉缺失的数据。与以前的版本相比,增加的数字更少。

from operator import *


def f(row):
try:
return [float(row)]
except Exception:
return []


rdd = sc.textFile('a.csv').flatMap(f)


print(rdd.count())      # 5
print(rdd.reduce(add))  # 15.0

test.md为例:

➜  spark-1.6.1 cat test.md
This is the first line;
This is the second line;
This is the last line.


scala> val textFile = sc.textFile("test.md")
scala> textFile.map(line => line.split(" ")).count()
res2: Long = 3


scala> textFile.flatMap(line => line.split(" ")).count()
res3: Long = 15


scala> textFile.map(line => line.split(" ")).collect()
res0: Array[Array[String]] = Array(Array(This, is, the, first, line;), Array(This, is, the, second, line;), Array(This, is, the, last, line.))


scala> textFile.flatMap(line => line.split(" ")).collect()
res1: Array[String] = Array(This, is, the, first, line;, This, is, the, second, line;, This, is, the, last, line.)

如果你使用map方法,你将得到test.md的行数,对于flatMap方法,你将得到字数。

map方法类似于flatMap,它们都返回一个新的RDD。map方法常用于返回一个新的RDD, flatMap方法常用于分割单词。

Flatmap和Map都转换集合。

的区别:

< p > 地图(函数) < br > 返回一个新的分布式数据集,通过函数func传递源的每个元素 < p > flatMap(函数) < br > 类似于map,但每个输入项可以映射到0个或多个输出项(因此func应该返回Seq而不是单个项)

变换函数:
地图:一个元素入->一个元素出 flatMap:一个元素进入-> 0或多个元素输出(一个集合)。

map和flatMap输出的差异:

1. __abc0

val a = sc.parallelize(1 to 10, 5)


a.flatMap(1 to _).collect()

输出:

 1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10

2. __abc0:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)


val b = a.map(_.length).collect()

输出:

3 6 6 3 8

对于所有想要PySpark相关的人:

示例转换:flatMap

>>> a="hello what are you doing"
>>> a.split()

['hello', 'what', 'are', 'you', 'doing']

>>> b=["hello what are you doing","this is rak"]
>>> b.split()

Traceback(最近一次调用): 文件“”,第1行,在 AttributeError: 'list'对象没有属性'split'

>>> rline=sc.parallelize(b)
>>> type(rline)

>>> def fwords(x):
...     return x.split()




>>> rword=rline.map(fwords)
>>> rword.collect()

[[‘你好’,‘什么’,‘是’,‘你’,‘做’],[‘这个’,‘是’,'爱你']]

>>> rwordflat=rline.flatMap(fwords)
>>> rwordflat.collect()

[‘你好’,‘什么’,‘是’,‘你’,‘做’,‘这’,‘是’,‘爱’)

希望能有所帮助。

区别可以从下面的pyspark代码示例中看到:

rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()
Output:
[1, 1, 2, 1, 2, 3]




rdd.map(lambda x: range(1, x)).collect()
Output:
[[1], [1, 2], [1, 2, 3]]
  • map(func)返回一个新的分布式数据集,该数据集通过func声明的函数传递源的每个元素。map()是单个项

其间

  • flatMap(func)类似于map,但是每个输入项可以映射到0个或多个输出项,因此func应该返回一个Sequence而不是单个项。

这可以归结为你最初的问题:你说扁是什么意思 ?

当你使用flatMap时,“多维”集合变成了“一维”集合。

val array1d = Array ("1,2,3", "4,5,6", "7,8,9")
//array1d is an array of strings


val array2d = array1d.map(x => x.split(","))
//array2d will be : Array( Array(1,2,3), Array(4,5,6), Array(7,8,9) )


val flatArray = array1d.flatMap(x => x.split(","))
//flatArray will be : Array (1,2,3,4,5,6,7,8,9)

当你想使用flatMap时,

  • 你的地图功能的结果是创建多层结构
  • 但所有你想要的是一个简单的-平面-一维结构,通过删除所有的内部分组

map:它通过对RDD的每个元素应用函数来返回一个新的RDD。.map 只能返回一项。中的函数

flatMap:类似于map,它通过将一个函数应用到RDD的每个元素,但输出是扁平化的。返回一个新的RDD

另外,flatMap中的函数可以返回一个元素列表(0或更多)

例如:

sc.parallelize([3,4,5]).map(lambda x: range(1,x)).collect()

输出:[[1,2],[1,2,3],[1,2,3,4]]

sc.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect()

输出:注意o/p在单个列表中被扁平化[1,2,1,2,3, 1, 2, 3, 4]

来源:https://www.linkedin.com/pulse/difference-between-map-flatmap-transformations-spark-pyspark-pandey/

RDD.map返回单个数组中的所有元素

RDD.flatMap返回数组的数组中的元素

让我们假设在text.txt文件中有文本

Spark is an expressive framework
This text is to understand map and faltMap functions of Spark RDD

使用地图

val text=sc.textFile("text.txt").map(_.split(" ")).collect

输出:

text: **Array[Array[String]]** = Array(Array(Spark, is, an, expressive, framework), Array(This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD))

使用flatMap

val text=sc.textFile("text.txt").flatMap(_.split(" ")).collect

输出:

 text: **Array[String]** = Array(Spark, is, an, expressive, framework, This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD)

所有的例子都很好....这是一个很好的视觉插图…来源:DataFlair spark培训

Map: Map是Apache Spark中的转换操作。它应用于RDD的每个元素,并将结果作为新的RDD返回。在Map中,操作开发人员可以定义自己的自定义业务逻辑。同样的逻辑将应用于RDD的所有元素。

Spark RDD map函数接受一个元素作为输入,根据自定义代码(由开发人员指定)处理它,并每次返回一个元素。Map将一个长度为N的RDD转换为另一个长度为N的RDD。输入和输出RDD通常具有相同数量的记录。

enter image description here

使用scala创建map的示例

val x = spark.sparkContext.parallelize(List("spark", "map", "example",  "sample", "example"), 3)
val y = x.map(x => (x, 1))
y.collect
// res0: Array[(String, Int)] =
//    Array((spark,1), (map,1), (example,1), (sample,1), (example,1))


// rdd y can be re writen with shorter syntax in scala as
val y = x.map((_, 1))
y.collect
// res1: Array[(String, Int)] =
//    Array((spark,1), (map,1), (example,1), (sample,1), (example,1))


// Another example of making tuple with string and it's length
val y = x.map(x => (x, x.length))
y.collect
// res3: Array[(String, Int)] =
//    Array((spark,5), (map,3), (example,7), (sample,6), (example,7))

FlatMap:

flatMap是一个转换操作。它应用于RDD的每个元素,并返回结果为new RDD。它类似于Map,但是FlatMap允许从Map函数返回0,1或更多元素。在FlatMap操作中,开发人员可以定义自己的自定义业务逻辑。同样的逻辑将应用于RDD的所有元素。

“flatten the results”是什么意思?

FlatMap函数接受一个元素作为输入,根据自定义代码(由开发人员指定)处理它,并一次返回0个或多个元素。flatMap()将长度为N的RDD转换为另一个长度为M的RDD。

enter image description here

使用scala的flatMap示例:

val x = spark.sparkContext.parallelize(List("spark flatmap example",  "sample example"), 2)


// map operation will return Array of Arrays in following case : check type of res0
val y = x.map(x => x.split(" ")) // split(" ") returns an array of words
y.collect
// res0: Array[Array[String]] =
//  Array(Array(spark, flatmap, example), Array(sample, example))


// flatMap operation will return Array of words in following case : Check type of res1
val y = x.flatMap(x => x.split(" "))
y.collect
//res1: Array[String] =
//  Array(spark, flatmap, example, sample, example)


// RDD y can be re written with shorter syntax in scala as
val y = x.flatMap(_.split(" "))
y.collect
//res2: Array[String] =
//  Array(spark, flatmap, example, sample, example)

地图:

是一种高阶方法,它接受一个函数作为输入,并将其应用于源RDD中的每个元素。

http://commandstech.com/difference-between-map-and-flatmap-in-spark-what-is-map-and-flatmap-with-examples/

flatMap:

接受输入函数的高阶方法和转换操作。

地图

通过将函数应用到该RDD的每个元素,返回一个新的RDD。

>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.map(lambda x: [(x, x), (x, x)]).collect())
[[(2, 2), (2, 2)], [(3, 3), (3, 3)], [(4, 4), (4, 4)]]

flatMap

返回一个新的RDD,首先对该RDD的所有元素应用一个函数,然后将结果扁平化。 这里可以将一个元素转换为多个元素

>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]