使用 groupby 设置或收集列表

如何在 groupby之后的数据帧上使用 collect_setcollect_list。例如: df.groupby('key').collect_set('values')。我得到一个错误: < code > AttributeError: ‘ GroupedData’对象没有属性‘ Collection _ set’

145430 次浏览

您需要使用 agg。示例:

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F


sc = SparkContext("local")


sqlContext = HiveContext(sc)


df = sqlContext.createDataFrame([
("a", None, None),
("a", "code1", None),
("a", "code2", "name2"),
], ["id", "code", "name"])


df.show()


+---+-----+-----+
| id| code| name|
+---+-----+-----+
|  a| null| null|
|  a|code1| null|
|  a|code2|name2|
+---+-----+-----+

注意在上面你必须创建一个 HiveContext。参见 https://stackoverflow.com/a/35529093/690430处理不同的 Spark 版本。

(df
.groupby("id")
.agg(F.collect_set("code"),
F.collect_list("name"))
.show())


+---+-----------------+------------------+
| id|collect_set(code)|collect_list(name)|
+---+-----------------+------------------+
|  a|   [code1, code2]|           [name2]|
+---+-----------------+------------------+

如果你的数据帧很大,你可以尝试使用 熊猫(GROUPED _ AGG)来避免内存错误。

分组聚合熊猫 UDF 类似于 Spark 聚合函数。分组聚合熊猫 UDF 与 groupBy ()一起使用。和火花。窗户。它定义来自一个或多个熊猫的聚合。序列为标量值,其中每个熊猫。序列表示组或窗口中的列。熊猫

例如:

import pyspark.sql.functions as F


@F.pandas_udf('string', F.PandasUDFType.GROUPED_AGG)
def collect_list(name):
return ', '.join(name)


grouped_df = df.groupby('id').agg(collect_list(df["name"]).alias('names'))