获取 DataFrame 的当前分区数

有什么方法可以得到 DataFrame 的当前分区数? 我检查了 DataFrame javadoc (park 1.6) ,没有找到相应的方法,还是我错过了? (对于 JavaRDD,有一个 getNumPartitions ()方法。)

156446 次浏览

You need to call getNumPartitions() on the DataFrame's underlying RDD, e.g., df.rdd.getNumPartitions(). In the case of Scala, this is a parameterless method: df.rdd.getNumPartitions.

convert to RDD then get the partitions length

DF.rdd.partitions.length
 val df = Seq(
("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")


df.rdd.getNumPartitions

dataframe.rdd.partitions.size is another alternative apart from df.rdd.getNumPartitions() or df.rdd.length.

let me explain you this with full example...

val x = (1 to 10).toList
val numberDF = x.toDF(“number”)
numberDF.rdd.partitions.size // => 4

To prove that how many number of partitions we got with above... save that dataframe as csv

numberDF.write.csv(“/Users/Ram.Ghadiyaram/output/numbers”)

Here is how the data is separated on the different partitions.

Partition 00000: 1, 2
Partition 00001: 3, 4, 5
Partition 00002: 6, 7
Partition 00003: 8, 9, 10

Update :

@Hemanth asked a good question in the comment... basically why number of partitions are 4 in above case

Short answer : Depends on cases where you are executing. since local[4] I used, I got 4 partitions.

Long answer :

I was running above program in my local machine and used master as local[4] based on that it was taking as 4 partitions.

val spark = SparkSession.builder()
.appName(this.getClass.getName)
.config("spark.master", "local[4]").getOrCreate()

If its spark-shell in master yarn I got the number of partitions as 2

example : spark-shell --master yarn and typed same commands again

scala> val x = (1 to 10).toList
x: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)




scala> val numberDF = x.toDF("number")
numberDF: org.apache.spark.sql.DataFrame = [number: int]


scala> numberDF.rdd.partitions.size
res0: Int = 2
  • here 2 is default parllelism of spark
  • Based on hashpartitioner spark will decide how many number of partitions to distribute. if you are running in --master local and based on your Runtime.getRuntime.availableProcessors() i.e. local[Runtime.getRuntime.availableProcessors()] it will try to allocate those number of partitions. if your available number of processors are 12 (i.e. local[Runtime.getRuntime.availableProcessors()]) and you have list of 1 to 10 then only 10 partitions will be created.

NOTE:

If you are on a 12-core laptop where I am executing spark program and by default the number of partitions/tasks is the number of all available cores i.e. 12. that means local[*] or s"local[${Runtime.getRuntime.availableProcessors()}]") but in this case only 10 numbers are there so it will limit to 10

keeping all these pointers in mind I would suggest you to try on your own

One more Interesting way to get number of partitions is 'using mapPartitions' transformation. Sample Code -

val x = (1 to 10).toList
val numberDF = x.toDF()
numberDF.rdd.mapPartitions(x => Iterator[Int](1)).sum()

Spark experts are welcome to comment on its performance.