Spark Dataframe 区分具有重复名称的列

因此,正如我在 Spark Dataframe 中所知,对于多个列,可以使用与下面数据帧快照中所示相同的名称:

[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]

上面的结果是通过将数据框连接到自身创建的,您可以看到有两个 af4列。

问题是,当我尝试用 a列做更多的计算时,我无法找到选择 a的方法,我尝试了 df[0]df.select('a'),它们都返回了下面的错误消息:

AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.

在 Spark API 中是否存在可以再次区分列和重复名称的方法?或者想办法让我改一下栏名?

254698 次浏览

在深入研究了 Spark API 之后,我发现我可以首先使用 alias为原始数据帧创建一个别名,然后使用 withColumnRenamed手动重命名别名上的每一列,这将在不引起列名重复的情况下完成 join

详情请参阅以下 火花数据框架 API:

Sql.DataFrame.alias

重命名为

然而,我认为这只是一个麻烦的解决办法,我想知道是否有更好的办法来解决我的问题。

我建议您更改 join的列名。

df1.select(col("a") as "df1_a", col("f") as "df1_f")
.join(df2.select(col("a") as "df2_a", col("f") as "df2_f"), col("df1_a" === col("df2_a"))

产生的 DataFrame将具有 schema

(df1_a, df1_f, df2_a, df2_f)

让我们从一些数据开始:

from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row


df1 = sqlContext.createDataFrame([
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=125231, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
])


df2 = sqlContext.createDataFrame([
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
])

有几种方法可以解决这个问题。首先,您可以使用父列明确地引用子表列:

df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)


##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

还可以使用表别名:

from pyspark.sql.functions import col


df1_a = df1.alias("df1_a")
df2_a = df2.alias("df2_a")


df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)


##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

最后,您可以通过编程方式重命名列:

df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
df2_r = df2.select(*(col(x).alias(x + '_df2') for x in df2.columns))


df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)


## +--------------------+
## |               f_df1|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+

可以使用 def drop(col: Column)方法删除重复的列,例如:

DataFrame:df1


+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+


DataFrame:df2


+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

当我将 df1和 df2连接起来时,DataFrame 将如下所示:

val newDf = df1.join(df2,df1("a")===df2("a"))


DataFrame:newDf


+-------+-----+-------+-----+
| a     | f   | a     | f   |
+-------+-----+-------+-----+
|107831 | ... |107831 | ... |
|107831 | ... |107831 | ... |
+-------+-----+-------+-----+

现在,我们可以使用 def drop(col: Column)方法删除重复的列‘ a’或‘ f’,如下所示:

val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))

假设要连接的 DataFrames 是 df1和 df2,并且要在列‘ a’上连接它们,那么就有了2个方法

方法1

Join (df2,‘ a’,‘ left _ out’)

这是一个很棒的方法,强烈推荐。

方法2

Join (df2,df1.a = = df2.a,‘ left _ outer’) . drop (df2.a)

有一个比为你加入的所有专栏写别名更简单的方法:

df1.join(df2,['a'])

如果您所连接的键在两个表中是相同的,那么这种方法就可以工作。

Https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html

这就是我们如何在 PySpark 中连接两个数据框架 在同一列名称上的方法。

df = df1.join(df2, ['col1','col2','col3'])

如果在此之后执行 printSchema(),则可以看到重复的列已被删除。

这可能不是最好的方法,但是如果您想要重命名重复的列(在 join 之后) ,您可以使用这个小函数。

def rename_duplicate_columns(dataframe):
columns = dataframe.columns
duplicate_column_indices = list(set([columns.index(col) for col in columns if columns.count(col) == 2]))
for index in duplicate_column_indices:
columns[index] = columns[index]+'2'
dataframe = dataframe.toDF(*columns)
return dataframe

如果你有一个比 Glennie Helles Sindholt 的答案中描述的更复杂的用例,例如,你有其他/几个非连接列名,也是相同的,想要区分他们,同时选择最好使用别名,例如:

df3 = df1.select("a", "b").alias("left")\
.join(df2.select("a", "b").alias("right"), ["a"])\
.select("left.a", "left.b", "right.b")


df3.columns
['a', 'b', 'b']

如果两个表中的键列相同,则尝试使用以下方法(方法1) :

left. join(right , 'key', 'inner')

而不是以下(方法2) :

left. join(right , left.key == right.key, 'inner')

使用方法1的优点:

  • 在最后的数据帧中,“密钥”只显示一次
  • 易于使用的语法

使用方法1的缺点:

  • 只对键列有帮助
  • 在左联接的情况下,如果计划使用右键 null 计数,这将不起作用。在这种情况下,必须像上面提到的那样重命名一个密钥。

对我有用的东西

import databricks.koalas as ks


df1k = df1.to_koalas()
df2k = df2.to_koalas()
df3k = df1k.merge(df2k, on=['col1', 'col2'])
df3 = df3k.to_spark()

除 col1和 col2之外的所有列,如果它们来自 df1,则在它们的名称后面加上“ _ x”,如果它们来自 df2,则在它们的名称后面加上“ _ y”,这正是我所需要的。

火花3.2.1 +

我在 Spark 3.2.1中找到了使用 toDF实现这一点的简单方法

df.show()
+------+------+---------+
|number|  word|     word|
+------+------+---------+
|     1| apple|   banana|
|     2|cherry|     pear|
|     3| grape|pineapple|
+------+------+---------+


df = df.toDF(*[val + str(i) for i, val in enumerate(df.columns)])


df.show()
+-------+------+---------+
|number0| word1|    word2|
+-------+------+---------+
|      1| apple|   banana|
|      2|cherry|     pear|
|      3| grape|pineapple|
+-------+------+---------+