Spark Java .lang. outofmemoryerror: Java堆空间

我的集群:1个主节点,11个从节点,每个节点有6gb内存。

我的设置:

spark.executor.memory=4g, Dspark.akka.frameSize=512

问题是这样的:

第一个,我从HDFS读取一些数据(2.19 GB)到RDD:

val imageBundleRDD = sc.newAPIHadoopFile(...)

第二个,在这个RDD上做一些事情:

val res = imageBundleRDD.map(data => {
val desPoints = threeDReconstruction(data._2, bg)
(data._1, desPoints)
})

最后的,输出到HDFS:

res.saveAsNewAPIHadoopFile(...)

当我运行我的程序时,它显示:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space

任务太多?

PS:当输入数据约为225 MB时,一切正常。

我该如何解决这个问题呢?

375961 次浏览

看看启动脚本,这里设置了一个Java堆大小,看起来你在运行Spark worker之前没有设置这个。

# Set SPARK_MEM if it isn't already set since we also use it for this process
SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM


# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"

你可以找到部署脚本在这里的文档。

我有一些建议:

  • 如果你的节点配置为Spark最大6g(并为其他进程留下一点),那么使用6g而不是4g, spark.executor.memory=6g。通过检查UI确保你在使用尽可能多的内存(它会说你正在使用多少mem)
  • 尝试使用更多的分区,每个CPU应该有2 - 4个分区。IME增加分区数量通常是使程序更稳定(通常更快)的最简单方法。对于大量的数据,每个CPU可能需要超过4个分区,在某些情况下,我不得不使用8000个分区!
  • 减少预留用于缓存的内存的百分比,使用spark.storage.memoryFraction。如果你的代码中没有使用cache()persist,这个值也可能是0。它的默认值是0.6,这意味着您的堆只有0.4 * 4g内存。IME减少mem压裂通常会使OOMs消失。从spark 1.6开始,显然我们将不再需要处理这些值,spark将自动确定它们。
  • 类似于上面的随机内存分数。如果您的作业不需要太多的shuffle内存,那么将其设置为一个较低的值(这可能会导致您的shuffle溢出到磁盘,从而对速度产生灾难性的影响)。有时当它是一个洗牌操作时,你需要做相反的事情,即将它设置为较大的值,比如0.8,或者确保你允许你的洗牌溢出到磁盘(这是1.0.0以来的默认值)。
  • 注意内存泄漏,这通常是由于在lambdas中不需要关闭对象而引起的。诊断的方法是寻找“任务序列化为XXX字节”。在日志中,如果XXX大于几k或大于MB,则可能存在内存泄漏。看到https://stackoverflow.com/a/25270600/1586965
  • 与上述有关的;如果确实需要大对象,请使用广播变量
  • 如果你正在缓存较大的RDD并且可以牺牲一些访问时间,可以考虑序列化RDD http://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage。或者甚至将它们缓存到磁盘上(如果使用ssd,有时也不是那么糟糕)。
  • (先进的)与上述相关,避免String和重度嵌套结构(如Map和嵌套case类)。如果可能,尽量只使用原语类型,并索引所有非原语类型,特别是在您希望有很多重复的情况下。尽可能选择WrappedArray而不是嵌套结构。或者甚至推出你自己的序列化-你将有关于如何有效地将你的数据备份到字节,使用它!
  • (位出租汽车司机)同样,当缓存时,考虑使用Dataset来缓存你的结构,因为它将使用更有效的序列化。与之前的要点相比,这应该被视为一种hack。将你的领域知识构建到你的算法/序列化中可以将内存/缓存空间最小化100倍或1000倍,而所有Dataset可能会给你的是2倍- 5倍的内存和10倍的磁盘压缩(拼花)。

http://spark.apache.org/docs/1.2.1/configuration.html

编辑:(So I can谷歌myself更容易)下面也表明了这个问题:

java.lang.OutOfMemoryError : GC overhead limit exceeded
设置内存堆大小的位置(至少在spark-1.0.0中)在conf/spark-env中。 相关变量为SPARK_EXECUTOR_MEMORY &SPARK_DRIVER_MEMORY。 更多的文档位于部署指南

. xml文件中

此外,不要忘记将配置文件复制到所有从节点。

你应该增加驱动内存。在你的$SPARK_HOME/conf文件夹中,你应该找到文件spark-defaults.conf,根据你的主内存编辑和设置spark.driver.memory 4000m,我认为。 这就是为我解决的问题,一切都运行顺利

为了添加一个通常不被讨论的用例,我将在当地的模式下通过spark-submit提交Spark应用程序时提出一个解决方案。

根据getbook 精通Apache Spark by 拉斯科夫斯基Jacek:

您可以在本地模式下运行Spark。在这种非分布式单JVM部署模式下,Spark在同一个JVM中生成所有执行组件——驱动程序、执行程序、后端和主机。这是驱动程序用于执行的唯一模式。

因此,如果你在使用heap时遇到OOM错误,调整driver-memory就足够了,而不是executor-memory

这里有一个例子:

spark-1.6.1/bin/spark-submit
--class "MyClass"
--driver-memory 12g
--master local[*]
target/scala-2.10/simple-project_2.10-1.0.jar

你应该配置offHeap内存设置如下所示:

val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.executor.memory", "70g")
.config("spark.driver.memory", "50g")
.config("spark.memory.offHeap.enabled",true)
.config("spark.memory.offHeap.size","16g")
.appName("sampleCodeForReference")
.getOrCreate()

根据您机器的RAM可用性提供驱动程序内存和执行程序内存。如果仍然面临OutofMemory问题,可以增加offHeap大小

从广义上讲,spark Executor JVM内存可以分为两部分。Spark内存和User内存。这是由属性spark.memory.fraction控制的——值在0到1之间。 当使用图像或在spark应用程序中进行内存密集型处理时,考虑减少spark.memory.fraction。这将为应用程序工作提供更多内存。Spark可能溢出,所以它仍然可以在较少的内存共享下工作。< / p > 问题的第二部分是工作分工。如果可能的话,将数据划分为更小的块。较小的数据可能需要较少的内存。但如果这是不可能的,你是牺牲计算为内存。通常一个执行程序会运行多个内核。执行程序的总内存必须足以处理所有并发任务的内存需求。如果不能增加执行程序内存,那么可以减少每个执行程序的内核数,这样每个任务都可以使用更多的内存。 使用1核执行器进行测试,这些执行器拥有最大的内存,然后不断增加核数,直到找到最佳的核数。< / p >

在使用动态资源分配时,我经常遇到这个问题。我原以为它会利用我的集群资源来最适合这个应用程序。

但事实上,动态资源分配并没有设置驱动程序内存,而是将其保持为默认值,即1G。

我通过将spark.driver.memory设置为适合我的驱动程序内存的数字来解决这个问题(对于32GB ram,我将其设置为18G)。

可以使用spark submit命令进行设置,方法如下:

spark-submit --conf spark.driver.memory=18g

非常重要的一点是,如果你从代码中设置它,根据Spark文档-动态加载Spark属性,这个属性将不会被考虑:

Spark属性主要可以分为两种:一种是与部署相关的,比如“Spark .driver”。记忆”、“spark.executor。这些属性在运行时通过SparkConf以编程方式设置时可能不会受到影响,或者行为取决于您选择的集群管理器和部署模式,因此建议通过配置文件或spark-submit命令行选项设置;另一个主要与Spark运行时控制有关,如“Spark .task”。maxfails”,这类属性可以用任意一种方式设置。

对于上面提到的错误,我没有什么建议。

检查执行程序分配的内存可能必须处理需要比分配的内存更多的分区。

尝试查看是否有更多的shuffle是实时的,因为shuffle是昂贵的操作,因为它们涉及磁盘I/O、数据序列化和网络I/O

●使用广播连接

避免使用groupByKeys,尽量用ReduceByKey代替

●避免在任何发生洗牌的地方使用巨大的Java对象

你把你的主垃圾收集日志扔掉了吗?所以我遇到了类似的问题,我发现SPARK_DRIVER_MEMORY只设置Xmx堆。初始堆大小仍然是1G,堆大小永远不会扩大到Xmx堆。

传递“——conf”spark.driver。extraJavaOptions=-Xms20g”解决了我的问题。

Ps aux | grep Java和您将看到以下日志:=

24501 30.7 - 1.7 41782944 2318184分/ 0 Sl + 18:49 0:33 /usr/java/latest/bin/java - cp / opt /火花/ conf /: / opt / /罐/ * -Xmx30g -Xms20g火花

根据我对上面提供的代码的理解,它加载文件并进行映射操作并保存回来。没有需要shuffle的操作。此外,没有任何操作需要将数据传输到驱动程序,因此调优与shuffle或驱动程序相关的任何内容都不会产生影响。当任务太多时,驱动程序确实会有问题,但这只是在spark 2.0.2版本之前。可能会有两件事出错。

  • 遗嘱执行人只有一个或几个。增加执行程序的数量,以便将它们分配给不同的从服务器。如果你正在使用yarn,需要改变num-executors配置,或者如果你正在使用spark standalone,那么需要调整每executor的num cores和spark max cores conf。在standalone中,num executors = max cores /每executor的cores。
  • 分区的数量很少,或者可能只有一个。所以如果这个值很低,即使我们有多核,多执行器,它也不会有太大的帮助,因为并行化依赖于分区的数量。通过imagebunderdd。repartition(11)来增加分区

设置这些确切的配置有助于解决问题。

spark-submit --conf spark.yarn.maxAppAttempts=2 --executor-memory 10g --num-executors 50 --driver-memory 12g
堆空间错误通常是由于将太多的数据带回驱动程序或执行程序而发生的。 在您的代码中,似乎没有将任何东西带回驱动程序,相反,您可能重载了使用threeDReconstruction()方法将一个输入记录/行映射到另一个输入记录/行的执行器。我不确定在方法定义中是什么,但这肯定会导致执行器的重载。 现在你有两个选项,

  1. 编辑你的代码,以更有效的方式进行三维重建。
  2. 不要编辑代码,但是给你的执行程序更多的内存,以及更多的内存开销。[spark.executor。内存或spark.driver.memoryOverhead]

我建议谨慎使用,只使用你需要的量。就内存需求而言,每个作业都是独一无二的,所以我建议根据经验尝试不同的值,每次增加2的幂(256M,512M,1G ..)等等)

您将得到一个可以工作的执行程序内存的值。尝试使用此值重新运行作业3或5次,然后再接受此配置。

简单,如果你正在使用一个脚本或juyter笔记本,然后只设置配置路径,当你开始构建一个spark会话…

spark = SparkSession.builder.master('local[*]').config("spark.driver.memory", "15g").appName('testing').getOrCreate()