Spark - repartition() vs coalesce()

根据Learning Spark

请记住,重新划分你的数据是一个相当昂贵的操作。 Spark还有一个名为repartition()的优化版本,它允许避免数据移动,但前提是你要减少RDD分区的数量

我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。

如果分区分布在多台机器上,并且运行coalesce(),它如何避免数据移动?

339275 次浏览

它避免了完整的洗牌。如果已知分区数量正在减少,则执行器可以安全地将数据保存在最小分区数量上,只将数据从额外的节点移到我们保留的节点上。

所以,它会是这样的:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

然后coalesce减少到2个分区:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

注意,节点1和节点3不需要移动其原始数据。

这里需要注意的一点是,Spark RDD的基本原则是不变性。重新分区或合并将创建新的RDD。基本RDD将继续存在其原始分区数量。如果用例要求将RDD持久化在缓存中,则必须对新创建的RDD进行同样的操作。

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26


scala> res16.partitions.length
res17: Int = 10


scala>  pairMrkt.partitions.length
res20: Int = 2

贾斯汀的回答很棒,这个回答更有深度。

repartition算法进行完全洗牌,并使用均匀分布的数据创建新分区。让我们用1到12的数字创建一个DataFrame。

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf在我的机器上包含4个分区。

numbersDf.rdd.partitions.size // => 4

下面是数据在分区上的划分方式:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

让我们使用repartition方法进行完全洗牌,并在两个节点上获得此数据。

val numbersDfR = numbersDf.repartition(2)

下面是如何在我的机器上划分numbersDfR数据:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

repartition方法创建新分区,并在新分区中均匀分布数据(对于较大的数据集,数据分布更均匀)。

coalescerepartition的区别

coalesce使用现有分区来最小化被打乱的数据量。repartition创建新分区并进行完全洗牌。coalesce会生成数据量不同的分区(有时分区的大小有很大不同),而repartition会生成大小大致相同的分区。

coalesce还是repartition更快?

coalesce可能比repartition运行得快,但大小不等的分区通常比大小相等的分区运行得慢。在过滤了一个大型数据集之后,通常需要对数据集重新分区。我发现repartition总体上更快,因为Spark是为使用相同大小的分区而构建的。

注意:我好奇地观察到重新分区会增加磁盘上数据的大小。在对大型数据集使用重分区/合并时,请确保运行测试。

阅读这篇博文如果你想要更多的细节。

何时使用coalesce &实践中的重新划分

所有的答案都为这个经常被问到的问题增添了一些伟大的知识。

所以根据这个问题的传统时间轴,这里是我的2美分。

我找到了重新分区要比合并快,在非常具体的情况下。

在我的应用程序中,当我们估计的文件数量低于某个阈值时,重新分区工作得更快。

这就是我的意思

if(numFiles > 20)
df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

在上面的代码片段中,如果我的文件小于20,合并将永远无法完成,而重新分区要快得多,因此上面的代码。

当然,这个数字(20)将取决于工作人员的数量和数据量。

希望这能有所帮助。

以一种简单的方式 COALESCE:-仅用于减少分区的数量,没有数据变换,它只是压缩分区

REPARTITION:-用于增加和减少分区的数量,但会发生洗牌

例子:-

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

两者都很好

但是当我们需要在一个集群中看到输出时,我们通常会选择这两个。

但是你也应该确保,如果你在处理巨大的数据,将要合并的节点的数据应该是高度配置的。因为所有的数据都会加载到那些节点上,可能会导致内存异常。 虽然赔款很贵,但我还是愿意用它。

.

.

在合并和重新分区之间进行明智的选择。

repartition -建议在增加分区数量的同时使用它,因为它涉及到所有数据的洗牌。

coalesce -建议在减少分区数量的同时使用它。例如,如果你有3个分区,你想把它减少到2个,coalesce将把第3个分区的数据移动到分区1和2。分区1和分区2将保留在同一个容器中。 另一方面,repartition将打乱所有分区中的数据,因此执行器之间的网络使用将很高,这将影响性能

在减少分区数量的同时,coalescerepartition性能更好。

我想在贾斯汀和鲍尔的回答中补充一点——

repartition将忽略现有分区并创建新分区。所以你可以用它来修复数据倾斜。您可以使用分区键来定义分布。数据倾斜是“大数据”问题空间中最大的问题之一。

coalesce将使用现有分区并洗牌其中的一个子集。它不能像repartition那样修复数据倾斜。因此,即使它更便宜,它也可能不是你需要的东西。

对于所有伟大的答案,我想补充一句,repartition是利用数据并行化的最佳选择之一。而coalesce提供了一个减少分区的廉价选项,当将数据写入HDFS或其他接收器以利用大写入时,它非常有用。

我发现这在以拼花格式写数据时很有用,可以充分利用它。

对于那些从PySpark (AWS EMR)生成单个csv文件并将其保存在s3上的问题,使用重新分区会有所帮助。原因是,合并不能进行完全洗牌,但重新分区可以。从本质上讲,您可以使用重分区增加或减少分区的数量,但使用合并只能减少分区的数量(而不是1)。以下是为试图从AWS EMR写入csv到s3的任何人编写的代码:

df.repartition(1).write.format('csv')\
.option("path", "s3a://my.bucket.name/location")\
.save(header = 'true')

代码和代码文档中可以看出,coalesce(n)coalesce(n, shuffle = false)相同,而repartition(n)coalesce(n, shuffle = true)相同

因此,coalescerepartition都可以用来增加分区的数量

使用shuffle = true,你实际上可以合并成一个更大的数字 的分区。如果你有少量的分区,这很有用, 例如100,可能有几个分区异常大

另一个需要强调的重要注意是,如果你大大减少数量的分区,你应该考虑使用coalesce打乱版本(在这种情况下与repartition相同)。这将允许你的计算被执行在父分区上并行(多任务)。

然而,如果你正在做一个激烈的合并,例如到numPartitions = 1,这可能会导致你的计算发生在比你想要的更少的节点上(例如,在numPartitions = 1的情况下只有一个节点)。为了避免这种情况,可以传递shuffle = true。这将添加一个shuffle步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。

请参考相关答案在这里

重新分区:将数据洗牌到一个新的分区数量。

如。初始数据帧划分为200个分区。

df.repartition(500):数据将从200个分区洗牌到新的500个分区。

合并:将数据洗牌到现有的分区数量。

df.coalesce(5):数据将从剩余的195个分区转移到5个现有分区。

repartition算法对数据进行完全洗牌,并创建大小相等的数据分区。coalesce结合现有分区以避免完全洗牌。

Coalesce可以很好地使用一个具有大量分区的RDD,并将单个工作节点上的分区组合在一起,以生成一个具有较少分区的最终RDD。

Repartition将重新洗牌RDD中的数据,以产生您请求的最终分区数量。 DataFrames的分区看起来像是一个应该由框架管理的低级实现细节,但事实并非如此。当将大的dataframe过滤成小的dataframe时,你应该总是对数据进行重新分区。 你可能会经常把大的数据帧过滤成小的数据帧,所以要习惯重新分区

阅读这篇博文如果你想要更多的细节。

另一个不同之处是考虑到存在倾斜连接的情况,您必须在其之上进行合并。在大多数情况下,重新分区将解决倾斜连接,然后您可以进行合并。

另一种情况是,假设你在一个数据帧中保存了一个中等/大量的数据,你必须批量生成到Kafka。在某些情况下,在生成到Kafka之前,重新分区有助于collectasList。但是,当容量非常大时,重新分区可能会导致严重的性能影响。在这种情况下,直接从dataframe生成Kafka会有所帮助。

附注:Coalesce并不像在工作人员之间进行完整的数据移动那样避免数据移动。但它确实减少了洗牌的次数。我想这就是那本书的意思。

有一个重新分区的用例>>即使在@Rob的回答中提到的分区数减少的情况下也要合并,即将数据写入单个文件。

@Rob的回答暗示了一个好的方向,但我认为需要一些进一步的解释来理解引擎盖下面发生了什么。

如果你需要在写入之前过滤你的数据,那么重新分区合并更适合,因为在加载操作之前,coalesce会被下推。

例如< p >: load().map(…).filter(…).coalesce(1).save() < / p > < p >翻译: load().coalesce(1).map(…).filter(…).save() < / p >

这意味着您的所有数据将被分解到一个分区,在那里它将被过滤,失去所有的并行性。 即使是非常简单的过滤器,如column='value'.

这不会发生在重分区:load().map(…).filter(…).repartition(1).save()

在这种情况下,在原始分区上并行地进行过滤。

举个数量级的例子,在我的例子中,当从Hive表加载后过滤109M行(~105G)和~1000个分区时,运行时从合并(1)的~6h下降到重新分区(1)的~2m。

具体的例子来自这篇文章来自AirBnB,它非常好,甚至涵盖了Spark中重新分区技术的更多方面。

基本上,重分区允许您增加或减少分区的数量。重分区重新分配来自所有分区的数据,这导致完全shuffle,这是非常昂贵的操作。

Coalesce是重新分区的优化版本,您只能减少分区的数量。由于我们只能减少分区的数量,它所做的是将一些分区合并为一个分区。通过合并分区,与重新分区相比,跨分区的数据移动更低。所以在Coalesce中是最小的数据移动,但说Coalesce不做数据移动是完全错误的说法。

另一件事是通过提供分区的数量来重新分区,它试图在所有分区上均匀地重新分配数据而在Coalesce的情况下,在某些情况下我们仍然可能有倾斜的数据。

  • Coalesce使用现有分区来最小化数据量 被打乱。重新分区将创建新的分区并执行满分区 洗牌。< / p >

  • Coalesce会导致不同数据量的分区 (有时分区有许多不同的大小)和 重新分区的结果是大致相同大小的分区

  • 联合我们可以减少分区,但修复我们可以用来增加和减少分区。

合并比重新分区执行得更好。合并总是减少分区。假设你在yarn中启用动态分配,你有四个分区和执行器。如果过滤器应用于它,超过可能的一个或多个执行程序是空的,没有数据。这个问题可以通过合并而不是重新划分来解决。

以下是代码级别的一些额外细节/差异:

在这里只添加函数定义,完整的代码实现检查spark的github页面。

下面是在数据帧上重新分区的不同方法: 检查完全实现在这里.

def repartition(numPartitions: Int): Dataset[T]

每当我们在dataframe上调用上述方法时,它都会返回一个新的数据集,该数据集恰好有numPartitions分区。

def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]

上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是哈希分区的。

 def repartition(partitionExprs: Column*): Dataset[T]

上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是哈希分区的。

def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]

上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是范围分区的。

def repartitionByRange(partitionExprs: Column*): Dataset[T]

上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是范围分区的。

但是对于合并,我们只有以下方法在数据框架上:

def coalesce(numPartitions: Int): Dataset[T]

上述方法将返回一个新的数据集,该数据集恰好具有numPartitions分区

以下是RDD上可用于重分区和合并的方法: 检查完全实现在这里.

  def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]


def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)

基本上,重分区方法通过传递shuffle值为true来调用coalesce方法。 现在如果我们在RDD上使用coalesce方法,通过传递shuffle值为true,我们也可以增加分区

Coalesce—可以增加或减少分区 重新分区—只能增加分区

但是我想说性能纯粹是基于用例的。联合并不总是比重新划分好。