火花: 将多个数组列拆分成行

我有一个数据框架,它有一行和几列。一些列是单个值,另一些是列表。所有列表列的长度相同。我希望将每个列表列拆分为一个单独的行,同时保持任何非列表列的原样。

登革热样本:

from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode


sqlc = SQLContext(sc)


df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# |  a|        b|        c|  d|
# +---+---------+---------+---+
# |  1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+

我想要的:

+---+---+----+------+
|  a|  b|  c |    d |
+---+---+----+------+
|  1|  1|  7 |  foo |
|  1|  2|  8 |  foo |
|  1|  3|  9 |  foo |
+---+---+----+------+

如果我只有一个列表列,那么只需做一个 explode:

df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# |  a|  b|        c|  d|
# +---+---+---------+---+
# |  1|  1|[7, 8, 9]|foo|
# |  1|  2|[7, 8, 9]|foo|
# |  1|  3|[7, 8, 9]|foo|
# +---+---+---------+---+

然而,如果我也尝试 explodec列,我最终得到的数据帧的长度是我想要的平方:

df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# |  a|  b|  c|  d|
# +---+---+---+---+
# |  1|  1|  7|foo|
# |  1|  1|  8|foo|
# |  1|  1|  9|foo|
# |  1|  2|  7|foo|
# |  1|  2|  8|foo|
# |  1|  2|  9|foo|
# |  1|  3|  7|foo|
# |  1|  3|  8|foo|
# |  1|  3|  9|foo|
# +---+---+---+---+

我想要的是——对于每一列,获取该列中数组的第 n 个元素,并将其添加到一个新行。我尝试过在数据框架中的所有列之间映射一个爆炸,但这似乎也不起作用:

df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()
70597 次浏览

Spark >= 2.4

You can replace zip_ udf with arrays_zip function

from pyspark.sql.functions import arrays_zip, col, explode


(df
.withColumn("tmp", arrays_zip("b", "c"))
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.b"), col("tmp.c"), "d"))

Spark < 2.4

With DataFrames and UDF:

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf, explode


zip_ = udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("first", IntegerType()),
StructField("second", IntegerType())
]))
)


(df
.withColumn("tmp", zip_("b", "c"))
# UDF output cannot be directly passed to explode
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))

With RDDs:

(df
.rdd
.flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
.toDF(["a", "b", "c", "d"]))

Both solutions are inefficient due to Python communication overhead. If data size is fixed you can do something like this:

from functools import reduce
from pyspark.sql import DataFrame


# Length of array
n = 3


# For legacy Python you'll need a separate function
# in place of method accessor
reduce(
DataFrame.unionAll,
(df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
for i in range(n))
).toDF("a", "b", "c", "d")

or even:

from pyspark.sql.functions import array, struct


# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
for i in range(n)
]))


(df
.withColumn("tmp", tmp)
.select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))

This should be significantly faster compared to UDF or RDD. Generalized to support an arbitrary number of columns:

# This uses keyword only arguments
# If you use legacy Python you'll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
return explode(array(*[
struct(*[col(c).getItem(i).alias(c) for c in colnames])
for i in range(n)
]))


df.withColumn("tmp", zip_and_explode("b", "c", n=3))

You'd need to use flatMap, not map as you want to make multiple output rows out of each input row.

from pyspark.sql import Row
def dualExplode(r):
rowDict = r.asDict()
bList = rowDict.pop('b')
cList = rowDict.pop('c')
for b,c in zip(bList, cList):
newDict = dict(rowDict)
newDict['b'] = b
newDict['c'] = c
yield Row(**newDict)


df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))

One liner (for Spark>=2.4.0):

df.withColumn("bc", arrays_zip("b","c"))
.select("a", explode("bc").alias("tbc"))
.select("a", col"tbc.b", "tbc.c").show()

Import required:

from pyspark.sql.functions import arrays_zip


Steps -

  1. Create a column bc which is an array_zip of columns b and c
  2. Explode bc to get a struct tbc
  3. Select the required columns a, b and c (all exploded as required).

Output:

> df.withColumn("bc", arrays_zip("b","c")).select("a", explode("bc").alias("tbc")).select("a", "tbc.b", col("tbc.c")).show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  1|  7|
|  1|  2|  8|
|  1|  3|  9|
+---+---+---+