在 Spark 中移除 DF 联接后的重复列

当您将两个具有相似列名的 DF 连接在一起时:

df = df1.join(df2, df1['id'] == df2['id'])

Join 可以正常工作,但是你不能调用 id列,因为它是模棱两可的,你会得到以下异常:

异常: “引用‘ id’是不明确的, 可能是: id # 5691,id # 5918

这使得 id不再可用..。

下面的函数解决了这个问题:

def join(df1, df2, cond, how='left'):
df = df1.join(df2, cond, how=how)
repeated_columns = [c for c in df1.columns if c in df2.columns]
for col in repeated_columns:
df = df.drop(df2[col])
return df

我不喜欢的是,我必须迭代列名,然后一个一个地删除它们。这看起来真的很笨重。

您是否知道有其他解决方案可以更优雅地联接和删除重复的列,或者删除多个列,而不需要对每个列进行迭代?

145157 次浏览

If the join columns at both data frames have the same names and you only need equi join, you can specify the join columns as a list, in which case the result will only keep one of the join columns:

df1.show()
+---+----+
| id|val1|
+---+----+
|  1|   2|
|  2|   3|
|  4|   4|
|  5|   5|
+---+----+


df2.show()
+---+----+
| id|val2|
+---+----+
|  1|   2|
|  1|   3|
|  2|   4|
|  3|   5|
+---+----+


df1.join(df2, ['id']).show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+

Otherwise you need to give the join data frames alias and refer to the duplicated columns by the alias later:

df1.alias("a").join(
df2.alias("b"), df1['id'] == df2['id']
).select("a.id", "a.val1", "b.val2").show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+

The code below works with Spark 1.6.0 and above.

salespeople_df.show()
+---+------+-----+
|Num|  Name|Store|
+---+------+-----+
|  1| Henry|  100|
|  2| Karen|  100|
|  3|  Paul|  101|
|  4| Jimmy|  102|
|  5|Janice|  103|
+---+------+-----+


storeaddress_df.show()
+-----+--------------------+
|Store|             Address|
+-----+--------------------+
|  100|    64 E Illinos Ave|
|  101|         74 Grand Pl|
|  102|          2298 Hwy 7|
|  103|No address available|
+-----+--------------------+

Assuming -in this example- that the name of the shared column is the same:

joined=salespeople_df.join(storeaddress_df, ['Store'])
joined.orderBy('Num', ascending=True).show()


+-----+---+------+--------------------+
|Store|Num|  Name|             Address|
+-----+---+------+--------------------+
|  100|  1| Henry|    64 E Illinos Ave|
|  100|  2| Karen|    64 E Illinos Ave|
|  101|  3|  Paul|         74 Grand Pl|
|  102|  4| Jimmy|          2298 Hwy 7|
|  103|  5|Janice|No address available|
+-----+---+------+--------------------+

.join will prevent the duplication of the shared column.

Let's assume that you want to remove the column Num in this example, you can just use .drop('colname')

joined=joined.drop('Num')
joined.show()


+-----+------+--------------------+
|Store|  Name|             Address|
+-----+------+--------------------+
|  103|Janice|No address available|
|  100| Henry|    64 E Illinos Ave|
|  100| Karen|    64 E Illinos Ave|
|  101|  Paul|         74 Grand Pl|
|  102| Jimmy|          2298 Hwy 7|
+-----+------+--------------------+

Assuming 'a' is a dataframe with column 'id' and 'b' is another dataframe with column 'id'

I use the following two methods to remove duplicates:

Method 1: Using String Join Expression as opposed to boolean expression. This automatically remove a duplicate column for you

a.join(b, 'id')

Method 2: Renaming the column before the join and dropping it after

b.withColumnRenamed('id', 'b_id')
joinexpr = a['id'] == b['b_id']
a.join(b, joinexpr).drop('b_id)

df.join(other, on, how) when on is a column name string, or a list of column names strings, the returned dataframe will prevent duplicate columns. when on is a join expression, it will result in duplicate columns. We can use .drop(df.a) to drop duplicate columns. Example:

cond = [df.a == other.a, df.b == other.bb, df.c == other.ccc]
# result will have duplicate column a
result = df.join(other, cond, 'inner').drop(df.a)

After I've joined multiple tables together, I run them through a simple function to drop columns in the DF if it encounters duplicates while walking from left to right. Alternatively, you could rename these columns too.

Where Names is a table with columns ['Id', 'Name', 'DateId', 'Description'] and Dates is a table with columns ['Id', 'Date', 'Description'], the columns Id and Description will be duplicated after being joined.

Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = dropDupeDfCols(NamesAndDates)
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")

Where dropDupeDfCols is defined as:

def dropDupeDfCols(df):
newcols = []
dupcols = []


for i in range(len(df.columns)):
if df.columns[i] not in newcols:
newcols.append(df.columns[i])
else:
dupcols.append(i)


df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
df = df.drop(str(dupcol))


return df.toDF(*newcols)

The resulting data frame will contain columns ['Id', 'Name', 'DateId', 'Description', 'Date'].

In my case I had a dataframe with multiple duplicate columns after joins and I was trying to same that dataframe in csv format, but due to duplicate column I was getting error. I followed below steps to drop duplicate columns. Code is in scala

1) Rename all the duplicate columns and make new dataframe 2) make separate list for all the renamed columns 3) Make new dataframe with all columns (including renamed - step 1) 4) drop all the renamed column

private def removeDuplicateColumns(dataFrame:DataFrame): DataFrame = {
var allColumns:  mutable.MutableList[String] = mutable.MutableList()
val dup_Columns: mutable.MutableList[String] = mutable.MutableList()
dataFrame.columns.foreach((i: String) =>{
if(allColumns.contains(i))


if(allColumns.contains(i))
{allColumns += "dup_" + i
dup_Columns += "dup_" +i
}else{
allColumns += i
}println(i)
})
val columnSeq = allColumns.toSeq
val df = dataFrame.toDF(columnSeq:_*)
val unDF = df.drop(dup_Columns:_*)
unDF
}

to call the above function use below code and pass your dataframe which contains duplicate columns

val uniColDF = removeDuplicateColumns(df)

If you join on a list or string, dup cols are automatically]1 removed This is a scala solution, you could translate the same idea into any language

// get a list of duplicate columns or use a list/seq
// of columns you would like to join on (note that this list
// should include columns for which you do not want duplicates)
val duplicateCols = df1.columns.intersect(df2.columns)


// no duplicate columns in resulting DF
df1.join(df2, duplicateCols.distinct.toSet)

Here is simple solution for remove duplicate column

final_result=df1.join(df2,(df1['subjectid']==df2['subjectid']),"left").drop(df1['subjectid'])

Spark SQL version of this answer:

df1.createOrReplaceTempView("t1")
df2.createOrReplaceTempView("t2")
spark.sql("select * from t1 inner join t2 using (id)").show()


# +---+----+----+
# | id|val1|val2|
# +---+----+----+
# |  1|   2|   2|
# |  1|   2|   3|
# |  2|   3|   4|
# +---+----+----+

This works for me when multiple columns used to join and need to drop more than one column which are not string type.

final_data = mdf1.alias("a").join(df3.alias("b")
(mdf1.unique_product_id==df3.unique_product_id) &
(mdf1.year_week==df3.year_week) ,"left" ).select("a.*","b.promotion_id")

Give a.* to select all columns from one table and from the other table choose specific columns.