Apache Spark:内核数量vs.执行器数量

我试图理解在YARN上运行Spark作业时,内核数量和执行器数量之间的关系。

测试环境如下:

  • 数据节点数量:3个
  • 数据节点机器规格:
    • CPU: i7-4790(4个内核,8个线程)
    • 内存:32GB (8GB x 4)
    • 硬盘:8TB (2TB × 4)
    • 李< / ul > < / >
    • 网络:1 gb

    • Spark版本:1.0.0

    • Hadoop版本:2.4.0 (Hortonworks HDP 2.1)

    • Spark作业流程:sc.textFile -> filter -> map -> filter -> mapToPair -> reduceByKey -> map -> saveAsTextFile

    • < p >输入数据

      • 类型:单个文本文件
      • 大小:165 gb
      • 行数:454,568,833
      • 李< / ul > < / >
      • < p >输出

        • 第二个过滤器后的行数:310,640,717
        • 结果文件行数:99,848,268
        • 结果文件的大小:41GB
        • 李< / ul > < / >

        作业在以下配置下运行:

        1. --master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3(每个数据节点的执行程序,使用的内核数量一样多)

        2. --master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3(减少内核数量)

        3. --master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12(更少的核心,更多的执行程序)

        运行时间:

        1. 50分钟15秒

        2. 55分钟48秒

        3. 31分钟23秒

        令我惊讶的是,(3)要快得多 我认为(1)会更快,因为在洗牌时执行器之间的通信会更少 虽然(1)的核数比(3)少,但核数不是关键因素,因为2)表现得很好

        (在pwilmot的回答之后添加了以下内容。)

        性能监视器屏幕截图如下:

        • (1)的Ganglia数据节点摘要-作业开始于04:37。

        Ganglia数据节点汇总(1)

        • (3)的Ganglia数据节点摘要-作业开始于19:47。请忽略之前的图表。

        Ganglia数据节点汇总(3)

        图表大致分为两部分:

        • 第一:从start到reduceByKey: CPU密集型,没有网络活动
        • 第二:reduceByKey: CPU降低后,网络I/O完成。

        如图所示,(1)可以使用尽可能多的CPU功率。所以,这可能不是线程数量的问题。

        如何解释这一结果?

152856 次浏览

我自己没有玩过这些设置,所以这只是推测,但如果我们把这个问题看作是分布式系统中的正常内核和线程,那么在您的集群中,您最多可以使用12个内核(4 * 3台机器)和24个线程(8 * 3台机器)。在前两个例子中,您为作业提供了相当数量的内核(潜在的计算空间),但是在这些内核上运行的线程(作业)数量非常有限,以至于您无法使用分配的大量处理能力,因此即使分配了更多的计算资源,作业也会变慢。

您提到您关心的是shuffle步骤—虽然在shuffle步骤中限制开销很好,但通常更重要的是利用集群的并行化。考虑一个极端的情况——一个没有shuffle的单线程程序。

我认为其中一个主要原因是地方性。您的输入文件大小为165G,文件的相关块当然分布在多个datanode上,更多的执行器可以避免网络复制。

尝试设置executor num等于blocks count,我认为可以更快。

当你在HDFS上运行你的spark应用程序时,根据桑迪Ryza

我注意到HDFS客户端有大量并发的问题 线程。粗略的猜测是最多每个执行者5个任务可以 实现全写吞吐量,所以最好保持数量

所以我相信你的第一个配置比第三个配置慢是因为HDFS的I/O吞吐量不好

为了让所有这些更具体一点,这里有一个配置Spark应用程序以使用尽可能多的集群的示例 想象一个集群,六个节点运行nodemanager,每个 配备16核64GB内存。NodeManager容量, yarn.nodemanager.resource。memory-mb和 yarn.nodemanager.resource。Cpu-vcores,应该设置为63 * 1024 = 64512(兆字节)和15。我们避免100%分配 因为节点需要一些资源 资源来运行操作系统和Hadoop守护进程。在这种情况下,我们留下 千兆字节和这些系统进程的核心。Cloudera Manager有帮助 通过计算这些并配置这些YARN属性 自动. < / p > 第一个冲动可能是使用——num-executors 6 ——executor-cores 15——executor-memory 63G。然而,这是错误的方法,因为:

63GB +执行程序内存开销将不适合63GB容量 nodemanager的。应用程序主程序将占用其中一个核心 这意味着没有空间容纳15核的执行程序 在那个节点上。每个执行器15个内核会导致糟糕的HDFS I/O 吞吐量。< / p > 更好的选择是使用——num-executors 17 ——execuker -cores 5——execuker -memory 19G。为什么?< / p >

该配置导致在所有节点上有三个执行程序,除了一个 AM遗嘱将有两位执行人。 ——executor-memory被导出为(每个节点63/3个executor) = 21。21 * 0.07 = 1.47。21 - 1.47 ~ 19.

Cloudera博客如何:调优Apache Spark作业(第2部分)中的一篇文章给出了解释。

Spark动态分配提供了灵活性,动态分配资源。在此范围内,可以给出最小和最大执行程序的数量。同样,在应用程序启动时必须启动的执行程序的数量也可以给定。

请阅读以下相同的内容:

http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation

我认为前两个配置有一个小问题。线程和核心的概念如下。线程的概念是,如果内核是理想的,那么使用该内核来处理数据。所以在前两种情况下,内存没有被充分利用。如果你想对这个例子进行基准测试,请选择每台机器上拥有多于10个核的机器。然后做标杆。

但是不要给每个执行器超过5个内核,这会在i/o性能上遇到瓶颈。

所以做这个基准测试的最好机器可能是有10个核的数据节点。

数据节点机器规格: CPU: i7-4790(核数:10,线程数:20) 内存:32GB (8GB x 4) 硬盘:8TB (2TB × 4)

RStudio的Sparklyr包页面处可用的优秀的资源中:

火花的定义:

提供一些简单的定义可能是有用的 Spark命名法:

节点:服务器

工作者节点:作为集群的一部分并且可用的服务器 运行Spark job

主节点:协调Worker节点的服务器。

遗嘱执行人:节点内的一种虚拟机。一个节点可以有 多个执行者。< / p >

司机节点:启动Spark会话的节点。通常情况下, 这将是sparklyr所在的服务器

驱动程序(执行器):驱动节点也将显示在执行器中 列表。< / p >

简短的回答:我认为tgbaggio是正确的。您在执行器上达到了HDFS的吞吐量限制。

我认为答案可能比这里的一些建议要简单一些。

对我来说,线索在集群网络图中。对于运行1,利用率稳定在~ 50m字节/秒。对于运行3,稳定的利用率增加了一倍,约为100 M字节/秒。

DzOrd共享的cloudera博客文章中,你可以看到这段重要的引用:

我注意到HDFS客户端有大量并发线程的问题。一个粗略的猜测是,每个执行器最多5个任务可以实现完整的写吞吐量,所以最好将每个执行器的内核数保持在这个数字以下。

那么,让我们做一些计算,看看如果这是真的,我们期望的性能是什么。


运行1:19 GB, 7核,3个执行器

  • 3个执行者x 7个线程= 21个线程
  • 每个executor有7个内核,我们期望有限的IO到HDFS(最多5个内核)
  • 有效吞吐量 ~= 3个executor x 5线程= 15个线程

运行3:4 GB, 2核,12个执行器

  • 2个executor x 12个线程= 24个线程
  • 每个executor有2个内核,所以HDFS的吞吐量是可以的
  • 有效吞吐量 ~= 12 executor x 2线程= 24线程

如果作业100%受到并发性(线程数)的限制。我们期望运行时与线程数完全成反比。

ratio_num_threads = nthread_job1 / nthread_job3 = 15/24 = 0.625
inv_ratio_runtime = 1/(duration_job1 / duration_job3) = 1/(50/31) = 31/50 = 0.62

ratio_num_threads ~= inv_ratio_runtime,看起来我们的网络有限。

同样的效果解释了运行1和运行2之间的差异。


运行2:19 GB, 4核,3个执行器

  • 3个执行者x 4个线程= 12个线程
  • 每个executor有4个内核,可以IO到HDFS
  • 有效吞吐量 ~= 3个executor x 4线程= 12个线程

比较有效线程数和运行时数:

ratio_num_threads = nthread_job2 / nthread_job1 = 12/15 = 0.8
inv_ratio_runtime = 1/(duration_job2 / duration_job1) = 1/(55/50) = 50/55 = 0.91

它没有上次的比较那么完美,但是当失去线程时,我们仍然可以看到类似的性能下降。

现在是最后一点:为什么线程越多性能越好,尤其是线程数比cpu数多?

并行性(我们通过将数据划分到多个CPU上得到的)和并发性(当我们使用多个线程在单个CPU上工作时得到的)之间的区别的一个很好的解释是由Rob Pike: 并发性不是并行性提供的这篇伟大的文章。

简单的解释是,如果一个Spark作业与一个文件系统或网络交互,那么CPU会花很多时间等待与这些接口的通信,而不是花很多时间实际“工作”。通过同时为这些cpu提供多个任务,它们将花费更少的等待时间和更多的工作时间,从而获得更好的性能。

在2.)配置中,您正在减少并行任务,因此我认为您的比较是不公平的。 使——num-executors至少为5。 因此,与1.)配置中的21个任务相比,您将有20个任务正在运行。

.

.

另外,请相应地计算执行程序内存。