更新火花中的数据框列

看看新的火花数据框架 API,还不清楚是否可以修改数据框架列。

如何在数据框架的 x行列 y中更改值?

pandas中,这将是:

df.ix[x,y] = new_value

编辑 : 合并下面所说的内容,您不能修改现有的数据框架,因为它是不可变的,但是您可以返回一个带有所需修改的新数据框架。

如果只想根据某个条件替换列中的值,比如 np.where:

from pyspark.sql import functions as F


update_func = (F.when(F.col('update_col') == replace_val, new_value)
.otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

如果要对列执行某些操作并创建添加到数据框中的新列:

import pyspark.sql.functions as F
import pyspark.sql.types as T


def my_func(col):
do stuff to column here
return transformed_value


# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())


df = df.withColumn('new_column_name', my_udf('update_col'))

如果希望新列的名称与旧列相同,可以添加额外的步骤:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
177664 次浏览

DataFrames are based on RDDs. RDDs are immutable structures and do not allow updating elements on-site. To change values, you will need to create a new DataFrame by transforming the original one either using the SQL-like DSL or RDD operations like map.

A highly recommended slide deck: Introducing DataFrames in Spark for Large Scale Data Science.

While you cannot modify a column as such, you may operate on a column and return a new DataFrame reflecting that change. For that you'd first create a UserDefinedFunction implementing the operation to apply and then selectively apply that function to the targeted column only. In Python:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType


name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_df now has the same schema as old_df (assuming that old_df.target_column was of type StringType as well) but all values in column target_column will be new_value.

Just as maasg says you can create a new DataFrame from the result of a map applied to the old DataFrame. An example for a given DataFrame df with two rows:

val newDf = sqlContext.createDataFrame(df.map(row =>
Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

Note that if the types of the columns change, you need to give it a correct schema instead of df.schema. Check out the api of org.apache.spark.sql.Row for available methods: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[Update] Or using UDFs in Scala:

import org.apache.spark.sql.functions._


val toLong = udf[Long, String] (_.toLong)


val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

and if the column name needs to stay the same you can rename it back:

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")

Commonly when updating a column, we want to map an old value to a new value. Here's a way to do that in pyspark without UDF's:

# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
F.when(df[update_col]==old_value,new_value).
otherwise(df[update_col])).

importing col, when from pyspark.sql.functions and updating fifth column to integer(0,1,2) based on the string(string a, string b, string c) into a new DataFrame.

from pyspark.sql.functions import col, when


data_frame_temp = data_frame.withColumn("col_5",when(col("col_5") == "string a", 0).when(col("col_5") == "string b", 1).otherwise(2))