如何向 Spark DataFrame 添加新列(使用 PySpark) ?

我有一个 Spark DataFrame (使用 PySpark 1.5.1) ,希望添加一个新列。

我尝试过以下方法,但没有成功:

type(randomed_hours) # => list


# Create in Python and transform to RDD


new_col = pd.DataFrame(randomed_hours, columns=['new_col'])


spark_new_col = sqlContext.createDataFrame(new_col)


my_df_spark.withColumn("hours", spark_new_col["new_col"])

使用这个也有错误:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

那么,如何使用 PySpark 向现有 DataFrame 添加一个新列(基于 Python 向量)呢?

454253 次浏览

不能在 Spark 中向 DataFrame添加任意列。只能使用文字创建新列(其他文字类型在 如何在 Spark DataFrame 中添加常量列?中描述)

from pyspark.sql.functions import lit


df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))


df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()


## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

改造现有的栏目:

from pyspark.sql.functions import exp


df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()


## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

包括使用 join:

from pyspark.sql.functions import exp


lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))


## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

或使用函数/udf 生成:

from pyspark.sql.functions import rand


df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()


## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

在性能方面,映射到 Catalyst 表达式的内置函数(pyspark.sql.functions)通常优于 Python 用户定义的函数。

如果您想将任意 RDD 的内容添加为列,则可以

  • 加入 现有数据框架的行号
  • 在 RDD 上调用 zipWithIndex并将其转换为数据帧
  • 使用 index 作为连接键将两者连接起来

使用 UDF 添加列:

df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))


from pyspark.sql.functions import udf
from pyspark.sql.types import *


def valueToCategory(value):
if   value == 1: return 'cat1'
elif value == 2: return 'cat2'
...
else: return 'n/a'


# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()


## +---+---+-----+---------+
## | x1| x2|   x3| category|
## +---+---+-----+---------+
## |  1|  a| 23.0|     cat1|
## |  3|  B|-23.0|      n/a|
## +---+---+-----+---------+

对于 Spark 2.0

# assumes schema has 'age' column
df.select('*', (df.age + 10).alias('agePlusTen'))

在添加 column_name时,您可以定义一个新的 udf:

u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
lambda val: val, # do sth to val
StringType()
)
df.withColumn('new_col', func_name(df.old_col))

我想为一个非常类似的用例提供一个通用的例子:

用例: 我有一个由以下内容组成的 csv:

First|Third|Fifth
data|data|data
data|data|data
...billion more lines

我需要执行一些转换,最终的 csv 需要看起来像

First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines

我需要这样做是因为这是由某个模型定义的模式,我需要我的最终数据能够与 SQL Bulk Inserts 之类的东西互操作。

所以:

1)我读取了原来的 csv 使用火花。阅读,并把它称为“ df”。

2)我对数据做了一些处理。

3)我使用这个脚本添加空列:

outcols = []
for column in MY_COLUMN_LIST:
if column in df.columns:
outcols.append(column)
else:
outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))


df = df.select(outcols)

通过这种方式,您可以在加载 csv 之后对模式进行结构化(如果需要对许多表进行重新排序,也可以对列进行重新排序)。

添加列的最简单方法是使用“ withColumn”。由于数据框架是使用 sqlContext 创建的,因此必须指定模式,或者默认情况下可以在数据集中使用。如果指定了架构,则每次更改时工作负载都会变得单调乏味。

下面是一个你可以考虑的例子:

from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default


# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")


# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")


# Check the change
Data.printSchema()

我们可以通过以下步骤直接向 DataFrame 添加其他列:

from pyspark.sql.functions import when
df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
df.show()

有多种方法可以在 pySpark 中添加新列。

让我们首先创建一个简单的 DataFrame。

date = [27, 28, 29, None, 30, 31]
df = spark.createDataFrame(date, IntegerType())

现在让我们尝试将列值加倍,并将其存储在一个新列中。PFB 几种不同的方法实现同样的效果。

# Approach - 1 : using withColumn function
df.withColumn("double", df.value * 2).show()


# Approach - 2 : using select with alias function.
df.select("*", (df.value * 2).alias("double")).show()


# Approach - 3 : using selectExpr function with as clause.
df.selectExpr("*", "value * 2 as double").show()


# Approach - 4 : Using as clause in SQL statement.
df.createTempView("temp")
spark.sql("select *, value * 2 as double from temp").show()

如果您想了解更多的例子和有关 SparkDataFrame 函数的说明,请访问我的 博客

希望这个能帮上忙。

添加具有某些自定义值或动态值计算的新列,这些自定义值或动态值计算将基于现有列进行填充。

例如:。

|ColumnA | ColumnB |
|--------|---------|
| 10     | 15      |
| 10     | 20      |
| 10     | 30      |

和新的 ColumnC 作为 ColumnA + ColumnB

|ColumnA | ColumnB | ColumnC|
|--------|---------|--------|
| 10     | 15      | 25     |
| 10     | 20      | 30     |
| 10     | 30      | 40     |

使用

#to add new column
def customColumnVal(row):
rd=row.asDict()
rd["ColumnC"]=row["ColumnA"] + row["ColumnB"]
    

new_row=Row(**rd)
return new_row


#convert DF to RDD
df_rdd= input_dataframe.rdd


#apply new fucntion to rdd
output_dataframe=df_rdd.map(customColumnVal).toDF()

input_dataframe是将被修改的数据框架,customColumnVal函数有添加新列的代码。