如何在Spark DataFrame中添加常量列?

我想在DataFrame中添加一个具有任意值的列(对于每一行都是相同的)。当我使用withColumn时,出现错误,如下所示:

dt.withColumn('new_column', 10).head(5)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
1 dt = (messages
2     .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)


/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
1166         [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
1167         """
-> 1168         return self.select('*', col.alias(colName))
1169
1170     @ignore_unicode_prefix


AttributeError: 'int' object has no attribute 'alias'

我似乎可以通过加减其他列中的一列(因此它们加到零),然后加上我想要的数字(本例中为10),来欺骗函数按照我想要的方式工作:

dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)
[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=93506471, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=80488242, messagetype=1, dt=4809600.0, new_column=10)]

这是超级黑客,对不对?我想有更合法的方法来做这件事?

288857 次浏览

Spark 2.2+

Spark 2.2 introduces typedLit to support Seq, Map, and Tuples (SPARK-19254) and following calls should be supported (Scala):

import org.apache.spark.sql.functions.typedLit


df.withColumn("some_array", typedLit(Seq(1, 2, 3)))
df.withColumn("some_struct", typedLit(("foo", 1, 0.3)))
df.withColumn("some_map", typedLit(Map("key1" -> 1, "key2" -> 2)))

Spark 1.3+ (lit), 1.4+ (array, struct), 2.0+ (map):

The second argument for DataFrame.withColumn should be a Column so you have to use a literal:

from pyspark.sql.functions import lit


df.withColumn('new_column', lit(10))

If you need complex columns you can build these using blocks like array:

from pyspark.sql.functions import array, create_map, struct


df.withColumn("some_array", array(lit(1), lit(2), lit(3)))
df.withColumn("some_struct", struct(lit("foo"), lit(1), lit(.3)))
df.withColumn("some_map", create_map(lit("key1"), lit(1), lit("key2"), lit(2)))

Exactly the same methods can be used in Scala.

import org.apache.spark.sql.functions.{array, lit, map, struct}


df.withColumn("new_column", lit(10))
df.withColumn("map", map(lit("key1"), lit(1), lit("key2"), lit(2)))

To provide names for structs use either alias on each field:

df.withColumn(
"some_struct",
struct(lit("foo").alias("x"), lit(1).alias("y"), lit(0.3).alias("z"))
)

or cast on the whole object

df.withColumn(
"some_struct",
struct(lit("foo"), lit(1), lit(0.3)).cast("struct<x: string, y: integer, z: double>")
)

It is also possible, although slower, to use an UDF.

Note:

The same constructs can be used to pass constant arguments to UDFs or SQL functions.

In spark 2.2 there are two ways to add constant value in a column in DataFrame:

1) Using lit

2) Using typedLit.

The difference between the two is that typedLit can also handle parameterized scala types e.g. List, Seq, and Map

Sample DataFrame:

val df = spark.createDataFrame(Seq((0,"a"),(1,"b"),(2,"c"))).toDF("id", "col1")


+---+----+
| id|col1|
+---+----+
|  0|   a|
|  1|   b|
+---+----+

1) Using lit: Adding constant string value in new column named newcol:

import org.apache.spark.sql.functions.lit
val newdf = df.withColumn("newcol",lit("myval"))

Result:

+---+----+------+
| id|col1|newcol|
+---+----+------+
|  0|   a| myval|
|  1|   b| myval|
+---+----+------+

2) Using typedLit:

import org.apache.spark.sql.functions.typedLit
df.withColumn("newcol", typedLit(("sample", 10, .044)))

Result:

+---+----+-----------------+
| id|col1|           newcol|
+---+----+-----------------+
|  0|   a|[sample,10,0.044]|
|  1|   b|[sample,10,0.044]|
|  2|   c|[sample,10,0.044]|
+---+----+-----------------+

As the other answers have described, lit and typedLit are how to add constant columns to DataFrames. lit is an important Spark function that you will use frequently, but not for adding constant columns to DataFrames.

You'll commonly be using lit to create org.apache.spark.sql.Column objects because that's the column type required by most of the org.apache.spark.sql.functions.

Suppose you have a DataFrame with a some_date DateType column and would like to add a column with the days between December 31, 2020 and some_date.

Here's your DataFrame:

+----------+
| some_date|
+----------+
|2020-09-23|
|2020-01-05|
|2020-04-12|
+----------+

Here's how to calculate the days till the year end:

val diff = datediff(lit(Date.valueOf("2020-12-31")), col("some_date"))
df
.withColumn("days_till_yearend", diff)
.show()
+----------+-----------------+
| some_date|days_till_yearend|
+----------+-----------------+
|2020-09-23|               99|
|2020-01-05|              361|
|2020-04-12|              263|
+----------+-----------------+

You could also use lit to create a year_end column and compute the days_till_yearend like so:

import java.sql.Date


df
.withColumn("yearend", lit(Date.valueOf("2020-12-31")))
.withColumn("days_till_yearend", datediff(col("yearend"), col("some_date")))
.show()
+----------+----------+-----------------+
| some_date|   yearend|days_till_yearend|
+----------+----------+-----------------+
|2020-09-23|2020-12-31|               99|
|2020-01-05|2020-12-31|              361|
|2020-04-12|2020-12-31|              263|
+----------+----------+-----------------+

Most of the time, you don't need to use lit to append a constant column to a DataFrame. You just need to use lit to convert a Scala type to a org.apache.spark.sql.Column object because that's what's required by the function.

See the datediff function signature:

enter image description here

As you can see, datediff requires two Column arguments.