将空列添加到 Spark DataFrame

正如在 Web 上的 很多 其他地点中所提到的,向现有 DataFrame 添加新列并不简单。不幸的是,拥有这种功能非常重要(即使在分布式环境中效率很低) ,特别是在尝试使用 unionAll连接两个 DataFrame时。

DataFrame中添加 null列以促进 unionAll,最优雅的解决方案是什么?

我的版本是这样的:

from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction
to_none = UserDefinedFunction(lambda x: None, StringType())
new_df = old_df.withColumn('new_column', to_none(df_old['any_col_from_old']))
145207 次浏览

All you need here is importing StringType and using lit and cast:

from pyspark.sql.types import StringType
from pyspark.sql.functions import lit


new_df = old_df.withColumn('new_column', lit(None).cast(StringType()))

A full example:

df = sc.parallelize([row(1, "2"), row(2, "3")]).toDF()
df.printSchema()
# root
#  |-- foo: long (nullable = true)
#  |-- bar: string (nullable = true)


new_df = df.withColumn('new_column', lit(None).cast(StringType()))


new_df.printSchema()
# root
#  |-- foo: long (nullable = true)
#  |-- bar: string (nullable = true)
#  |-- new_column: string (nullable = true)


new_df.show()
# +---+---+----------+
# |foo|bar|new_column|
# +---+---+----------+
# |  1|  2|      null|
# |  2|  3|      null|
# +---+---+----------+

A Scala equivalent can be found here: Create new Dataframe with empty/null field values

I would cast lit(None) to NullType instead of StringType. So that if we ever have to filter out not null rows on that column...it can be easily done as follows

df = sc.parallelize([Row(1, "2"), Row(2, "3")]).toDF()


new_df = df.withColumn('new_column', lit(None).cast(NullType()))


new_df.printSchema()


df_null = new_df.filter(col("new_column").isNull()).show()
df_non_null = new_df.filter(col("new_column").isNotNull()).show()

Also be careful about not using lit("None")(with quotes) if you are casting to StringType since it would fail for searching for records with filter condition .isNull() on col("new_column").

The option without import StringType

df = df.withColumn('foo', F.lit(None).cast('string'))

Full example:

from pyspark.sql import functions as F
df = spark.range(1, 3).toDF('c')


df = df.withColumn('foo', F.lit(None).cast('string'))


df.printSchema()
#     root
#      |-- c: long (nullable = false)
#      |-- foo: string (nullable = true)


df.show()
#     +---+----+
#     |  c| foo|
#     +---+----+
#     |  1|null|
#     |  2|null|
#     +---+----+