在 Spark 中,如何将阶段划分为任务?

让我们假设每个时间点只有一个 Spark 作业在运行。

目前为止我得到了什么

以下是我对 Spark 的理解:

  1. 创建 SparkContext时,每个工作节点启动一个执行器。 执行器是独立的进程(JVM) ,连接回驱动程序。每个执行器都有驱动程序的 jar。辞掉司机,关闭遗嘱执行人。每个执行器可以保存一些分区。
  2. 执行作业时,根据沿袭图创建执行计划。
  3. 执行作业被划分为多个阶段,其中包含尽可能多的相邻(在沿袭图中)转换和操作的阶段,但不包含洗牌。因此,阶段被洗牌分开。

image 1

我明白

  • 任务是通过序列化函数对象从驱动程序发送到执行程序的命令。
  • 执行程序反序列化(使用驱动程序 jar)命令(任务)并在分区上执行它。

但是

问题

我如何将舞台划分为这些任务?

具体来说:

  1. 任务是由转换和操作决定的,还是可以是多个转换/操作在任务中?
  2. 是由分区决定的任务(例如,每个分区每个阶段一个任务)。
  3. 任务是否由节点决定(例如,每个节点每个阶段一个任务) ?

我的想法(只是部分答案,即使是正确的)

https://0x0fff.com/spark-architecture-shuffle中,洗牌是用图像来解释的

enter image description here

我得到的印象是

每个阶段被划分为 # number-of-Partitions 任务,而不考虑节点的数量

对于我的第一张图片,我会说我有3个映射任务和3个减少任务。

对于0x0fff 的图像,我认为有8个 map 任务和3个 reduce 任务(假设只有3个橙色和3个深绿色文件)。

任何情况下都可以提问

是这样吗?但是即使这是正确的,我上面的问题也没有全部得到回答,因为它仍然是开放的,无论多个操作(例如多个映射)是在一个任务中,还是在每个操作中被分成一个任务。

别人怎么说

在 Spark 中什么是任务?Spark 的工作人员如何执行 jar 文件?和 ApacheSpark 调度程序如何将文件分割为任务?是相似的,但是我觉得我的问题在那里没有得到明确的回答。

75534 次浏览

你的大纲写得不错,可以回答你的问题

  • 需要为每个 stage的每个数据分区启动一个单独的 task 是的。考虑到每个分区可能驻留在不同的物理位置-例如 HDFS 中的块或本地文件系统的目录/卷。

请注意,Stage的提交是由 DAG Scheduler驱动的。这意味着不相互依赖的阶段可以提交给集群并行执行: 这将最大限度地提高集群上的并行能力。因此,如果我们的数据流中的操作可以同时发生,我们将期望看到多个阶段的启动。

我们可以在下面的玩具示例中看到这一点,在这个示例中,我们执行下列操作类型:

  • 加载两个数据源
  • 分别对两个数据源执行映射操作
  • 加入他们
  • 对结果执行一些映射和过滤操作
  • 保存结果

那么我们最终会有几个阶段?

  • 1阶段,每个阶段加载两个数据源并行 = 2阶段
  • 第三个阶段代表 join,在另外两个阶段代表 受供养人
  • 注意: 处理联接数据的所有后续操作都可以在 一样阶段执行,因为它们必须按顺序发生。启动其他阶段没有好处,因为它们在先前的作业完成之前不能开始工作。

这是那个玩具程序

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

这是结果的 DAG

enter image description here

现在: 多少个 任务 ? 任务的数量应该等于

(Stage * #Partitions in the stage)之和

如果我理解正确的话,有两件相关的事情会让你感到困惑:

1)是什么决定了任务的内容?

2)什么决定要执行的任务数量?

Spark 的引擎将 很简单的操作“粘合”在一起,例如:

rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

因此,当(延迟地)计算 rdd3时,park 将为 rdd1的每个分区生成一个任务,并且每个任务将执行过滤器和每行映射,以生成 rdd3。

任务的数量由分区的数量决定。每个 RDD 都有一定数量的分区。对于从 HDFS 读取的源 RDD (例如使用 sc.textFile (...)) ,分区数是输入格式生成的拆分数。对 RDD 的某些操作可能导致具有不同数量分区的 RDD:

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

另一个例子是连接:

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).

(大多数)改变分区数的操作涉及到洗牌,例如:

rdd2 = rdd1.repartition( 1000 )

实际发生的是,rdd1的每个分区上的任务都需要产生一个可以在下一阶段读取的最终输出,从而使 rdd2具有正好1000个分区(它们是如何做到的?大麻排序)。这边的任务有时称为“ Map (side)任务”。 稍后将在 rdd2上运行的任务将作用于一个分区(rdd2!)并且必须弄清楚如何读取/组合与该分区相关的映射端输出。这一边的任务有时被称为“减少(侧)任务”。

这两个问题是相关的: 一个阶段中的任务数量是分区的数量(对于连续的“粘合”在一起的 rdd 来说是公共的) ,而 rdd 的分区数量可以在不同阶段之间变化(例如,通过指定一些导致操作的洗牌的分区数量)。

一旦一个阶段的执行开始,它的任务就可以占据任务槽。并发任务槽的数量是 numExecators * ExecutorCores。一般来说,这些任务可以被来自不同的非依赖阶段的任务所占据。

这可能有助于你更好地理解不同的部分:

  • Stage: 是任务的集合 不同的数据子集(分区)。
  • 任务: 表示 在分布式数据集的分区上工作, Number-of-task = number-of-Partitions,或者正如您所说的“每个任务一个任务” 每个分区的阶段”。
  • 每个执行器在一个纱线容器上运行,并且 每个容器驻留在一个节点上。
  • 每个阶段使用多个执行器,每个执行器分配多个 vcore。
  • 每个 vcore 一次只能执行一个任务
  • 因此,在任何阶段,多个任务都可以并行执行。