根据 RDD/Spark DataFrame 中的特定列从行中删除重复项

假设我有一个非常大的数据集,其形式如下:

data = sc.parallelize([('Foo',41,'US',3),
('Foo',39,'UK',1),
('Bar',57,'CA',2),
('Bar',72,'CA',2),
('Baz',22,'US',6),
('Baz',36,'US',6)])

我想要做的是根据第一、第三和第四列的值删除重复的行。

删除完全重复的行非常简单:

data = data.distinct()

第5行或第6行将被删除

但是如何只删除基于列1、3和4的重复行呢?也就是说,删除以下任何一项:

('Baz',22,'US',6)
('Baz',36,'US',6)

在 Python 中,这可以通过使用 .drop_duplicates()指定列来完成?

201266 次浏览

From your question, it is unclear as-to which columns you want to use to determine duplicates. The general idea behind the solution is to create a key based on the values of the columns that identify duplicates. Then, you can use the reduceByKey or reduce operations to eliminate duplicates.

Here is some code to get you started:

def get_key(x):
return "{0}{1}{2}".format(x[0],x[2],x[3])


m = data.map(lambda x: (get_key(x),x))

Now, you have a key-value RDD that is keyed by columns 1,3 and 4. The next step would be either a reduceByKey or groupByKey and filter. This would eliminate duplicates.

r = m.reduceByKey(lambda x,y: (x))

I know you already accepted the other answer, but if you want to do this as a DataFrame, just use groupBy and agg. Assuming you had a DF already created (with columns named "col1", "col2", etc) you could do:

myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")

Note that in this case, I chose the Max of col2, but you could do avg, min, etc.

Agree with David. To add on, it may not be the case that we want to groupBy all columns other than the column(s) in aggregate function i.e, if we want to remove duplicates purely based on a subset of columns and retain all columns in the original dataframe. So the better way to do this could be using dropDuplicates Dataframe api available in Spark 1.4.0

For reference, see: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame

Pyspark does include a dropDuplicates() method, which was introduced in 1.4. https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html

>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+


>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+

I used inbuilt function dropDuplicates(). Scala code given below

val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")


data.dropDuplicates(Array("x","count")).show()

Output :

+---+---+---+-----+
|  x|  y|  z|count|
+---+---+---+-----+
|Baz| 22| US|    6|
|Foo| 39| UK|    1|
|Foo| 41| US|    3|
|Bar| 57| CA|    2|
+---+---+---+-----+

This is my Df contain 4 is repeated twice so here will remove repeated values.

scala> df.show
+-----+
|value|
+-----+
|    1|
|    4|
|    3|
|    5|
|    4|
|   18|
+-----+


scala> val newdf=df.dropDuplicates


scala> newdf.show
+-----+
|value|
+-----+
|    1|
|    3|
|    5|
|    4|
|   18|
+-----+

The below programme will help you drop duplicates on whole , or if you want to drop duplicates based on certain columns , you can even do that:

import org.apache.spark.sql.SparkSession


object DropDuplicates {
def main(args: Array[String]) {
val spark =
SparkSession.builder()
.appName("DataFrame-DropDuplicates")
.master("local[4]")
.getOrCreate()


import spark.implicits._


// create an RDD of tuples with some data
val custs = Seq(
(1, "Widget Co", 120000.00, 0.00, "AZ"),
(2, "Acme Widgets", 410500.00, 500.00, "CA"),
(3, "Widgetry", 410500.00, 200.00, "CA"),
(4, "Widgets R Us", 410500.00, 0.0, "CA"),
(3, "Widgetry", 410500.00, 200.00, "CA"),
(5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
(6, "Widget Co", 12000.00, 10.00, "AZ")
)
val customerRows = spark.sparkContext.parallelize(custs, 4)


// convert RDD of tuples to DataFrame by supplying column names
val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")


println("*** Here's the whole DataFrame with duplicates")


customerDF.printSchema()


customerDF.show()


// drop fully identical rows
val withoutDuplicates = customerDF.dropDuplicates()


println("*** Now without duplicates")


withoutDuplicates.show()


val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))


println("*** Now without partial duplicates too")


withoutPartials.show()


}
}