Apache Spark: map vs mapPartitions?

RDD 的mapmapPartitions的方法有什么区别? flatMap的行为是像 map还是像 mapPartitions? 谢谢。

(编辑) 也就是说,两者之间的区别(无论是语义上的还是执行方面的)是什么

  def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
preservesPartitioning = true)
}

还有:

  def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.map(fn)
}
157608 次浏览

RDD 的 map 和 mapPartitions 方法之间的区别是什么?

该方法通过应用函数将源 RDD 的每个 元素转换为结果 RDD 的单个元素。地图分区将源 RDD 的每个 分区转换为结果的多个元素(可能没有)。

LatMap 的行为是像 map 还是像 mapPartitions?

两者都不是,平面地图在单个元素上工作(如 map) ,并生成结果的多个元素(如 mapPartitions)。

提示:

无论何时进行重量级初始化,都应该执行一次 对于许多 RDD元素而不是每个 RDD元素一次,如果 初始化,例如从第三方创建对象 库,不能序列化(这样 Spark 就可以跨 集群到工作节点) ,使用 mapPartitions()代替 map().mapPartitions()提供要完成的初始化 每个工作任务/线程/分区一次,而不是每个 RDD数据一次 元素的 < strong > < em > 示例: 见下文。

val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/


val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB


connection.close() // close dbconnection here
newPartition.iterator // create a new iterator
})

问题2. ABC0的行为是像 map 还是像 mapPartitions

是的,请参阅 flatmap的例子2. . 它的自我解释。

问1. RDD 的 ABC0和 mapPartitions有什么区别

map工作在每个元素级别上使用的函数,同时 mapPartitions在分区级别执行该函数。

示例场景 : 如果我们在特定的 RDD分区中有100K 个元素,那么当我们使用 map时,我们将启动映射转换所使用的函数100K 次。

相反,如果我们使用 mapPartitions,那么我们将只调用特定的函数一次,但是我们将传递所有100K 记录,并在一次函数调用中返回所有响应。

由于 map在特定函数上工作了很多次,所以性能会有所提高,特别是当函数每次执行一些代价高昂的操作时,如果我们一次传入所有元素(对于 mappartitions) ,就不需要执行这些操作。

地图

对 RDD 的每个项应用转换函数并返回 结果是一个新的 RDD。

列表变种

Def map [ U: ClassTag ](f: T = > U) : RDD [ U ]

例如:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))

地图分区

这是一个专门的映射,对于每个分区只调用一次。 各个分区的整个内容可以作为 通过输入参数的连续值流(Iterarator [ T ])。 自定义函数必须返回另一个迭代器[ U ] 结果迭代器会自动转换成一个新的 RDD 注意,下面的元组(3,4)和(6,7)缺少 结果由于我们选择的分区。

preservesPartitioning指示输入函数是否保留 分区器,它应该是 false,除非这是一对 RDD 和输入 函数不会修改键。

列表变种

Def mapPartitions [ U: ClassTag ](f: 迭代器[ T ] = > 迭代器[ U ] , [分区: Boolean = false ] : RDD [ U ]

例子一

val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

例子2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
var res = List[Int]()
while (iter.hasNext) {
val cur = iter.next;
res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
}
res.iterator
}
x.mapPartitions(myfunc).collect
// some of the number are not outputted at all. This is because the random number generated for it is zero.
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)

上面的程序也可以使用 latMap 编写,如下所示。

使用平面映射的示例2

val x  = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect


res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)

结论:

mapPartitions转换比 map快,因为它只调用函数一次/分区,而不是一次/元素。

延伸阅读: Foreach Vs foreachPartitions 什么时候使用?

地图 :

  1. 它一次处理一行,非常类似于 MapReduce 的 map ()方法。
  2. 每行之后从转换返回。

地图分区

  1. 它一次性处理完整的分区。
  2. 在处理完整个分区之后,只能从函数返回一次。
  3. 在处理整个分区之前,所有中间结果都需要保存在内存中。
  4. 提供类似 MapReduce 的 setup () map ()和 clean up ()函数

Map Vs mapPartitions Http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions Http://bytepadding.com/big-data/spark/spark-mappartitions/

地图:

地图转换。

这个地图一次处理一行。

在每个输入行之后映射返回。

映射不在 Memory 中保存输出结果。

映射没有办法找出然后结束服务。

// map example


val dfList = (1 to 100) toList


val df = dfList.toDF()


val dfInt = df.map(x => x.getInt(0)+2)


display(dfInt)

地图分区:

MapPartition 转换。

MapPartition 一次对一个分区工作。

MapPartition 在处理完分区中的所有行之后返回。

MapPartition 输出保留在内存中,因为它可以在处理特定分区中的所有行之后返回。

MapPartition 服务可以在返回之前关闭。

// MapPartition example


Val dfList = (1 to 100) toList


Val df = dfList.toDF()


Val df1 = df.repartition(4).rdd.mapPartition((int) => Iterator(itr.length))


Df1.collec()


//display(df1.collect())

有关详细信息,请参阅 Spark map 与 mapPartitions 转换文章。

希望这对你有帮助!