异常: 在推测模式下,shuffle 0缺少一个输出位置?

我正在以推测模式运行 Spark 的任务。我有大约500个任务和大约500个1GB 的压缩文件。对于1-2个任务,我不断得到每个任务,附加的错误,它后来重新运行了几十次(阻止任务完成)。

异常: 丢失了 shuffle 0的输出位置

你知道这个问题的意义是什么,以及如何克服它吗?

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
117597 次浏览

We had a similar error with Spark, but I'm not sure it's related to your issue.

We used JavaPairRDD.repartitionAndSortWithinPartitions on 100GB data and it kept failing similarly to your app. Then we looked at the Yarn logs on the specific nodes and found out that we have some kind of out-of-memory problem, so the Yarn interrupted the execution. Our solution was to change/add spark.shuffle.memoryFraction 0 in .../spark/conf/spark-defaults.conf. That allowed us to handle a much larger (but unfortunately not infinite) amount of data this way.

This happened to me when I gave more memory to the worker node than it has. Since it didn't have swap, spark crashed while trying to store objects for shuffling with no more memory left.

Solution was to either add swap, or configure the worker/executor to use less memory in addition with using MEMORY_AND_DISK storage level for several persists.

I got the same issue on my 3 machine YARN cluster. I kept changing RAM but the issue persisted. Finally I saw the following messages in the logs:

17/02/20 13:11:02 WARN spark.HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 1006275 ms exceeds timeout 1000000 ms
17/02/20 13:11:02 ERROR cluster.YarnScheduler: Lost executor 2 on 1worker.com: Executor heartbeat timed out after 1006275 ms

and after this, there was this message:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 67

I modified the properties in spark-defaults.conf as follows:

spark.yarn.scheduler.heartbeat.interval-ms 7200000
spark.executor.heartbeatInterval 7200000
spark.network.timeout 7200000

That's it! My job completed successfully after this.

In my case (standalone cluster) the exception was thrown because the file system of some Spark slaves was filled 100%. Deleting everything in the spark/work folders of the slaves solved the issue.

I solved this error increasing the allocated memory in executorMemory and driverMemory. You can do this in HUE selecting the Spark Program which is causing the problem and in properties -> Option list you can add something like this:

--driver-memory 10G --executor-memory 10G --num-executors 50 --executor-cores 2

Of course the values of the parameters will vary depending on you cluster's size and your needs.

I got the same problem, but I searched many answers which can not solve my problem. eventually, I debug my code step by step. I find the problem that caused by the data size is not balanced for each partition , leaded to MetadataFetchFailedException that in map stage not reduce stage . just do df_rdd.repartition(nums) before reduceByKey()

For me, I was doing some windowing on large data (about 50B rows) and getting a boat load of

ExternalAppendOnlyUnsafeRowArray:54 - Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

In my logs. Obviously 4096 can be small on such data size... this led me to the following JIRA:

https://issues.apache.org/jira/browse/SPARK-21595

And ultimately to the following two config options:

  • spark.sql.windowExec.buffer.spill.threshold
  • spark.sql.windowExec.buffer.in.memory.threshold

Both default to 4096; I raised them much higher (2097152) and things now seem to do well. I'm not 100% sure this is the same as the issue raised here, but it's another thing to try.

in the Spark Web UI, if there is some info like Executors lost, then you have to check the yarn log, make sure whether your container has been killed.

If the container was killed, it is probably due to the lack of memory.

How to find the key info in yarn logs? For example, there might be some warnings like this:

Container killed by YARN for exceeding memory limits. 2.5 GB of 2.5 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.

In this case, it suggests you should increase spark.yarn.executor.memoryOverhead.

The error arises when there is a lot of data in a particular spark partition. The way to solve this is to do the following steps:

  1. Increase the number of shuffle-partitions: --conf spark.sql.shuffle.partitions=<some-high-number-lets say 200>
  2. In normal cases the number of partitions should be set to number of executors * number of cores per executor . But this kind of partitioning scheme will be problematic if we have huge amount of data. See the example below.

Suppose we had the following data and we had three executors with 1 core each , so the number of partitions(physical-partitions) in this case would be 3

 Data:  1,2,3,4,5,6,7,8,9,13,16,19,22


Partitions:  1,2,3
Distribution of Data in Partitions (partition logic based on modulo by 3)


1-> 1,4,7,13,16,19,22
2-> 2,5,8
3->3,6,9
From above we can see that there is data skew, partition 1 is having more
data than the rest
 

Now lets increase the number of partitions to : number of executors * number
of cores per executor*2 = 6 (in our example. These 6 partitions will be
logical partitions.Now each executor will be having 2 logical partitions
instead of 1 .Data partitioning will be based on modulo 6 instead of 3.


Partitions of data in each executor:


1->(0,1)->1,6,7,13,19
2->(2,3)-->2,3,8,9
3->(4,5)->4,5,16,22
The increase in logical partitions leads to fair partitioning.
  1. The next thing you can do after increasing the number of shuffle partitions is to decrease the storage part of the spark memory if you are not persisting or caching any dataframe. By default the storage part is 0.5 and execution part is also 0.5 . To reduce the storage part you can set in your spark-submit command the following configuration

        --conf spark.memory.storageFraction=0.3
    

4.) Apart from the above two things you can also set executor overhead memory. --conf spark.executor.memoryOverhead=2g

 This is off-heap memory that is used for Virtual Machine overheads, interned
strings etc.

5.) Apart from this , you can limit the number of files processed in a particular microbatch by setting the maxFilesPerTrigger to a smaller value say 10.