如何避免联接后重复列?

我有两个数据框,其中包括以下列:

df1.columns
//  Array(ts, id, X1, X2)

还有

df2.columns
//  Array(ts, id, Y1, Y2)

等我找到之后

val df_combined = df1.join(df2, Seq(ts,id))

我最终得到以下列: Array(ts, id, X1, X2, ts, id, Y1, Y2)。我可以预料到,公共列将被删除。还有什么额外的事情需要做吗?

119169 次浏览

这是一种预期的行为。 DataFrame.join方法等效于这样的 SQL 连接

SELECT * FROM a JOIN b ON joinExprs

如果您想忽略重复的列,只需删除它们,或者随后选择感兴趣的列。如果您想消除歧义,您可以使用父 DataFrames来访问这些内容:

val a: DataFrame = ???
val b: DataFrame = ???
val joinExprs: Column = ???


a.join(b, joinExprs).select(a("id"), b("foo"))
// drop equivalent
a.alias("a").join(b.alias("b"), joinExprs).drop(b("id")).drop(a("foo"))

或使用别名:

// As for now aliases don't work with drop
a.alias("a").join(b.alias("b"), joinExprs).select($"a.id", $"b.foo")

对于等价连接,存在一种特殊的快捷语法,它采用 字符串的序列:

val usingColumns: Seq[String] = ???


a.join(b, usingColumns)

或作为 单根弦

val usingColumn: String = ???


a.join(b, usingColumn)

它只保留联接条件中使用的列的一个副本。

这个问题困扰我已经有一段时间了,直到最近我才想出了一个相当简单的解决方案。

说是

scala> val a  = Seq(("a", 1), ("b", 2)).toDF("key", "vala")
a: org.apache.spark.sql.DataFrame = [key: string, vala: int]


scala> a.show
+---+----+
|key|vala|
+---+----+
|  a|   1|
|  b|   2|
+---+----+
and
scala> val b  = Seq(("a", 1)).toDF("key", "valb")
b: org.apache.spark.sql.DataFrame = [key: string, valb: int]


scala> b.show
+---+----+
|key|valb|
+---+----+
|  a|   1|
+---+----+

我可以这样做,只选择数据帧 a 中的值:

scala> a.join(b, a("key") === b("key"), "left").select(a.columns.map(a(_)) : _*).show
+---+----+
|key|vala|
+---+----+
|  a|   1|
|  b|   2|
+---+----+

简单的答案(来自 关于这个问题的常见问题解答)是执行连接,其中联接的列表示为 字符串数组字符串数组(或一个字符串) ,而不是谓词。

下面是一个例子改编自数据库常见问题,但有两个连接栏,以回答原来的海报的问题。

这是 左边的数据框架:

val llist = Seq(("bob", "b", "2015-01-13", 4), ("alice", "a", "2015-04-23",10))


val left = llist.toDF("firstname","lastname","date","duration")


left.show()


/*
+---------+--------+----------+--------+
|firstname|lastname|      date|duration|
+---------+--------+----------+--------+
|      bob|       b|2015-01-13|       4|
|    alice|       a|2015-04-23|      10|
+---------+--------+----------+--------+
*/

这是 的数据框架:

val right = Seq(("alice", "a", 100),("bob", "b", 23)).toDF("firstname","lastname","upload")


right.show()


/*
+---------+--------+------+
|firstname|lastname|upload|
+---------+--------+------+
|    alice|       a|   100|
|      bob|       b|    23|
+---------+--------+------+
*/

下面是一个 不正确解决方案,其中连接列被定义为谓词 left("firstname")===right("firstname") && left("lastname")===right("lastname")

错误的结果是 firstnamelastname列在合并的数据框架中重复:

left.join(right, left("firstname")===right("firstname") &&
left("lastname")===right("lastname")).show


/*
+---------+--------+----------+--------+---------+--------+------+
|firstname|lastname|      date|duration|firstname|lastname|upload|
+---------+--------+----------+--------+---------+--------+------+
|      bob|       b|2015-01-13|       4|      bob|       b|    23|
|    alice|       a|2015-04-23|      10|    alice|       a|   100|
+---------+--------+----------+--------+---------+--------+------+
*/

正确解决方案是将连接列定义为字符串 Seq("firstname", "lastname")的数组。输出数据框架没有重复的列:

left.join(right, Seq("firstname", "lastname")).show


/*
+---------+--------+----------+--------+------+
|firstname|lastname|      date|duration|upload|
+---------+--------+----------+--------+------+
|      bob|       b|2015-01-13|       4|    23|
|    alice|       a|2015-04-23|      10|   100|
+---------+--------+----------+--------+------+
*/

这是 SQL 的一个正常行为,我为此所做的是:

  • 删除或重命名源列
  • 加入我们
  • 删除重命名列(如果有)

在这里,我替换了“全名”一栏:

Java 中的一些代码:

this
.sqlContext
.read()
.parquet(String.format("hdfs:///user/blablacar/data/year=%d/month=%d/day=%d", year, month, day))
.drop("fullname")
.registerTempTable("data_original");


this
.sqlContext
.read()
.parquet(String.format("hdfs:///user/blablacar/data_v2/year=%d/month=%d/day=%d", year, month, day))
.registerTempTable("data_v2");


this
.sqlContext
.sql(etlQuery)
.repartition(1)
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);

问题所在:

SELECT
d.*,
concat_ws('_', product_name, product_module, name) AS fullname
FROM
{table_source} d
LEFT OUTER JOIN
{table_updates} u ON u.id = d.id

这是你可以做的事情,只有火花我相信(从列表中删除列) ,非常非常有帮助!

你可以简单地使用这个

df1.join(df2, Seq("ts","id"),"TYPE-OF-JOIN")

这里的连接类型可以是

  • 左边
  • 内心
  • 充满

例如,我有两个这样的数据框架:

// df1
word   count1
w1     10
w2     15
w3     20


// df2
word   count2
w1     100
w2     150
w5     200

如果执行 fulloutjoin,那么结果如下所示

df1.join(df2, Seq("word"),"fullouter").show()


word   count1  count2
w1     10      100
w2     15      150
w3     20      null
w5     null    200

试试这个,

val df_combined = df1.join(df2, df1("ts") === df2("ts") && df1("id") === df2("id")).drop(df2("ts")).drop(df2("id"))

最佳实践是在加入 DF 之前使两个 DF 中的列名不同,并相应地删除它们。

df1.columns =[id, age, income]
df2.column=[id, age_group]


df1.join(df2, on=df1.id== df2.id,how='inner').write.saveAsTable('table_name')

将返回一个错误,而对于重复的列则返回错误

试试这个,试试这个:

df2_id_renamed = df2.withColumnRenamed('id','id_2')
df1.join(df2_id_renamed, on=df1.id== df2_id_renamed.id_2,how='inner').drop('id_2')

在将多个表联接在一起之后,如果遇到重复,我将通过一个简单的函数来运行它们,以便重命名 DF 中的列。或者,你也可以删除这些重复的列

其中,Names是一个包含 ['Id', 'Name', 'DateId', 'Description']列的表,而 Dates是一个包含 ['Id', 'Date', 'Description']列的表,IdDescription列在联接之后将被复制。

Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = deDupeDfCols(NamesAndDates, '_')
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")

deDupeDfCols的定义如下:

def deDupeDfCols(df, separator=''):
newcols = []


for col in df.columns:
if col not in newcols:
newcols.append(col)
else:
for i in range(2, 1000):
if (col + separator + str(i)) not in newcols:
newcols.append(col + separator + str(i))
break


return df.toDF(*newcols)

生成的数据框架将包含 ['Id', 'Name', 'DateId', 'Description', 'Id2', 'Date', 'Description2']列。

对不起,这个答案是在 Python 中——我不熟悉 Scala,但这是我在谷歌这个问题时出现的问题,我确信 Scala 代码与 也是没有什么不同。

“内连接”是“火花连接”中的默认连接,下面是它的简单语法。

leftDF.join(rightDF,"Common Col Nam")

对于其他连接,您可以遵循以下语法

leftDF.join(rightDF,Seq("Common Columns comma seperated","join type")

如果列 Name 不常见,则

leftDF.join(rightDF,leftDF.col("x")===rightDF.col("y),"join type")

如果有人正在使用 park-SQL 并且希望实现同样的目标,那么您可以在连接查询中使用 USING子句。

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._


val df1 = List((1, 4, 3), (5, 2, 4), (7, 4, 5)).toDF("c1", "c2", "C3")
val df2 = List((1, 4, 3), (5, 2, 4), (7, 4, 10)).toDF("c1", "c2", "C4")


df1.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")


spark.sql("select * from table1  inner join  table2  using (c1, c2)").show(false)


/*
+---+---+---+---+
|c1 |c2 |C3 |C4 |
+---+---+---+---+
|1  |4  |3  |3  |
|5  |2  |4  |4  |
|7  |4  |5  |10 |
+---+---+---+---+
*/