如何定义数据框架的分区?

我已经开始在 Spark 1.4.0中使用 Spark SQL 和 DataFrames。我想用 Scala 在 DataFrames 上定义一个自定义分区程序,但是没有看到如何做到这一点。

我正在使用的一个数据表包含按帐户分列的事务列表,类似于下面的示例。

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

至少在最初,大多数计算将在帐户内的事务之间进行。所以我希望对数据进行分区,这样一个帐户的所有事务都在同一个 Spark 分区中。

但我不知道该怎么定义。DataFrame 类有一个名为“ rePartitionInt”的方法,您可以在该方法中指定要创建的分区数。但是我没有看到任何可用的方法来为 DataFrame 定义自定义分区程序,比如可以为 RDD 指定自定义分区程序。

源数据存储在 Parquet。我确实看到,在向 Parquet 编写 DataFrame 时,您可以指定一个列来进行分区,因此可以假设我可以告诉 Parquet 通过“ Account”列来对数据进行分区。但是可能有数百万个账户,如果我理解正确的话,Parquet 会为每个账户创建一个不同的目录,所以这听起来不像是一个合理的解决方案。

有没有办法让 Spark 对这个 DataFrame 进行分区,以便 Account 的所有数据都在同一个分区中?

196854 次浏览

使用返回的 DataFrame:

yourDF.orderBy(account)

没有明确的方法可以在 DataFrame 上使用 partitionBy,只能在 PairRDD 上使用,但是当你对 DataFrame 排序时,它会在它的 LogicalPlan 中使用它,当你需要对每个帐户进行计算时,它会有所帮助。

我只是偶然发现了同样的问题,用一个数据框架,我想按帐户划分。 我假设当您说“想要将数据分区,以便一个帐户的所有事务都在同一个 Spark 分区中”时,您想要它的规模和性能,但是您的代码并不依赖于它(如使用 mapPartitions()等) ,对吗?

在 Spark < 1.6中如果你创建一个 HiveContext,而不是老式的 SqlContext,你可以使用 HiveQL DISTRIBUTE BY colX...(确保每个 N 还原得到不重叠的 x 范围)和 CLUSTER BY colX...(分布式和排序式的快捷方式) ;

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

不知道这和 Spark DF api 有什么关系。普通 SqlContext 不支持这些关键字(注意,使用 HiveContext 不需要 hive 元存储)

编辑: Spark 1.6 + 现在在本地 DataFrame API 中有这个

所以从某种回答开始:)-你不能

我不是一个专家,但就我对 DataFrame 的理解而言,它们不等于 rdd,而且 DataFrame 没有 Partifier 这样的东西。

一般来说,DataFrame 的想法是提供另一个抽象级别来处理这些问题本身。DataFrame 上的查询被转换为逻辑计划,然后再进一步转换为 RDD 上的操作。您建议的分区可能会自动应用,或者至少应该应用。

如果您不相信 SparkSQL 会提供某种最佳作业,那么您总是可以按照注释中的建议将 DataFrame 转换为 RDD [ Row ]。

我可以使用 RDD 来完成这项工作,但我不知道这是否是一个可以接受的解决方案。 一旦有了可作为 RDD 使用的 DF,就可以应用 repartitionAndSortWithinPartitions来执行自定义的数据重新分区。

下面是我使用的一个例子:

class DatePartitioner(partitions: Int) extends Partitioner {


override def getPartition(key: Any): Int = {
val start_time: Long = key.asInstanceOf[Long]
Objects.hash(Array(start_time)) % partitions
}


override def numPartitions: Int = partitions
}


myRDD
.repartitionAndSortWithinPartitions(new DatePartitioner(24))
.map { v => v._2 }
.toDF()
.write.mode(SaveMode.Overwrite)

Spark > = 2.3.0

SPARK-22614 暴露范围分区。

val partitionedByRange = df.repartitionByRange(42, $"k")


partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
//
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389 暴露 数据源 API v2中的外部格式分区。

Spark > = 1.6.0

在 Spark > = 1.6中,可以使用按列分区进行查询和缓存。参见: 使用 repartition方法的 SARK-11410星火 -4849:

val df = Seq(
("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")


val partitioned = df.repartition($"k")
partitioned.explain


// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

RDDs不同,Spark Dataset(包括 Dataset[Row] a.k.a DataFrame)目前不能使用自定义分区器。您通常可以通过创建一个人工分区列来解决这个问题,但它不会提供同样的灵活性。

星火 < 1.6.0:

您可以做的一件事情是在创建 DataFrame之前对输入数据进行预分区

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner


val schema = StructType(Seq(
StructField("x", StringType, false),
StructField("y", LongType, false),
StructField("z", DoubleType, false)
))


val rdd = sc.parallelize(Seq(
Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))


val partitioner = new HashPartitioner(5)


val partitioned = rdd.map(r => (r.getString(0), r))
.partitionBy(partitioner)
.values


val df = sqlContext.createDataFrame(partitioned, schema)

由于从 RDD创建 DataFrame只需要一个简单的映射阶段,因此应该保留现有的分区布局 * :

assert(df.rdd.partitions == partitioned.partitions)

与重新分区现有 DataFrame的方法相同:

sqlContext.createDataFrame(
df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
df.schema
)

所以看起来并非不可能。问题仍然是它是否有意义。我认为,大多数时候情况并非如此:

  1. 重新分区是一个昂贵的过程。在一个典型的场景中,大多数数据必须被序列化、洗牌和反序列化。另一方面,可以从预分区数据中获益的操作数量相对较少,如果内部 API 的设计不能利用这个属性,那么操作数量就会进一步受到限制。

    • 但这需要内部支持,
    • 带匹配分区器的窗口函数调用。与上面一样,仅限于单个窗口定义。但是它已经在内部分区了,所以预分区可能是多余的,
    • 使用 GROUP BY进行简单的聚合——可以减少临时缓冲区的内存占用 * * ,但总体成本要高得多。或多或少等效于 groupByKey.mapValues(_.reduce)(当前行为)与 reduceByKey(预分区)。在实践中不太可能有用。
    • 因为它看起来像是在使用行程长度编码,所以应用 OrderedRDDFunctions.repartitionAndSortWithinPartitions可以改善数据压缩的压缩比。
  2. 性能高度依赖于键的分布。如果它是倾斜的,它将导致一个次优的资源利用率。在最坏的情况下,根本不可能完成这项工作。

  3. 使用高级声明性 API 的全部意义在于将您自己与低级实现细节隔离开来。正如 @ dwysakowicz@ RomiKuntsman已经提到的,优化是 催化剂优化器的工作。这是一个相当复杂的野兽,我真的怀疑你可以很容易地改进,而不深入到它的内部。

相关概念

使用 JDBC 源进行分区 :

JDBC 数据源支持 predicates参数:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

它为每个谓词创建一个 JDBC 分区。请记住,如果使用单个谓词创建的集合没有不相交,则会在结果表中看到重复项。

DataFrameWriter中的 partitionBy方法 :

Spark DataFrameWriter提供了 partitionBy方法,可用于在写入时对数据进行“分区”。它使用提供的列集分隔写入时的数据

val df = Seq(
("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")


df.write.partitionBy("k").json("/tmp/foo.json")

这允许基于键的查询的谓词下推读取:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

但它不等同于 DataFrame.repartition,特别是像:

val cnts = df1.groupBy($"k").sum()

仍然需要 TungstenExchange:

cnts.explain


// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

DataFrameWriter中的 bucketBy方法 (Spark > = 2.0) :

bucketBy具有与 partitionBy类似的应用程序,但它只适用于表(saveAsTable)。桶状信息可用于优化连接:

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)


df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")


// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* 我说的 分区布局只是一个数据分布。 partitioned RDD 不再是一个分区器。 * * 假设没有早期投影。如果聚合只覆盖列的一小部分,那么可能没有任何收益。