PySpark-使用 withColumnRename 重命名多个列

我想使用 parks withColumnRename 函数更改两个列的名称。当然,我可以这样写:

data = sqlContext.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
data = (data
.withColumnRenamed('x1','x3')
.withColumnRenamed('x2', 'x4'))

但是我想一步到位(有一个列表/元组的新名称):

data = data.withColumnRenamed(['x1', 'x2'], ['x3', 'x4'])

还有这个:

data = data.withColumnRenamed(('x1', 'x2'), ('x3', 'x4'))

有可能这样做吗?

155380 次浏览

不可能使用单个 withColumnRenamed调用。

  • 可以使用 DataFrame.toDF方法 *

    data.toDF('x3', 'x4')
    

    或者

    new_names = ['x3', 'x4']
    data.toDF(*new_names)
    
  • It is also possible to rename with simple select:

    from pyspark.sql.functions import col
    
    
    mapping = dict(zip(['x1', 'x2'], ['x3', 'x4']))
    data.select([col(c).alias(mapping.get(c, c)) for c in data.columns])
    

Similarly in Scala you can:

  • Rename all columns:

    val newNames = Seq("x3", "x4")
    
    
    data.toDF(newNames: _*)
    
  • Rename from mapping with select:

    val  mapping = Map("x1" -> "x3", "x2" -> "x4")
    
    
    df.select(
    df.columns.map(c => df(c).alias(mapping.get(c).getOrElse(c))): _*
    )
    

    foldLeft + withColumnRenamed

    mapping.foldLeft(data){
    case (data, (oldName, newName)) => data.withColumnRenamed(oldName, newName)
    }
    

* Not to be confused with RDD.toDF which is not a variadic functions, and takes column names as a list,

我也找不到一个简单的火花解决方案,所以只好自己做了一个,类似于熊猫的 df.rename(columns={'old_name_1':'new_name_1', 'old_name_2':'new_name_2'})

import pyspark.sql.functions as F


def rename_columns(df, columns):
if isinstance(columns, dict):
return df.select(*[F.col(col_name).alias(columns.get(col_name, col_name)) for col_name in df.columns])
else:
raise ValueError("'columns' should be a dict, like {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}")

因此,您的解决方案将看起来像 data = rename_columns(data, {'x1': 'x3', 'x2': 'x4'})

如果你想链接你的方法调用,Spark 3.0引入了 转换,你可以通过以下方式使用它:

my_df.transform(lambda df: rename_columns(df, {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}))

它节省了我一些代码行,希望它也能帮助你。

你为什么要用一句话来表演呢 如果您打印执行计划,它实际上只用一行完成

data = spark.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
data = (data
.withColumnRenamed('x1','x3')
.withColumnRenamed('x2', 'x4'))
data.explain()

输出

== Physical Plan ==
*(1) Project [x1#1548L AS x3#1552L, x2#1549L AS x4#1555L]
+- Scan ExistingRDD[x1#1548L,x2#1549L]

如果你想用一个元组列表 你可以使用一个简单的映射函数

data = spark.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
new_names = [("x1","x3"),("x2","x4")]
data = data.select(list(
map(lambda old,new:F.col(old).alias(new),*zip(*new_names))
))


data.explain()

仍然有同样的计划

输出

== Physical Plan ==
*(1) Project [x1#1650L AS x3#1654L, x2#1651L AS x4#1655L]
+- Scan ExistingRDD[x1#1650L,x2#1651L]

如果希望使用具有前缀的相同列名重命名多个列,那么这应该可以工作

df.select([f.col(c).alias(PREFIX + c) for c in df.columns])

最简单的方法如下:

说明:

  1. 使用 df.column 获取 pypark 数据框架中的所有列
  2. 创建一个循环遍历步骤1中的每一列的列表
  3. 列表将输出: col1(“ col1”) . alias (“ col1 _ x”)
  4. * [ list ]将解压缩 pypsark 中的 select 语句的列表

Sql 导入函数为 F (df . select (* [ F.col(c) . alias (f“{ c } _ x”) for c in df.column ]) 。到熊猫()。头() )

希望这个能帮上忙

我所有的火花程序里都有这个黑客程序:

import pyspark
def rename_sdf(df, mapper={}, **kwargs_mapper):
''' Rename column names of a dataframe
mapper: a dict mapping from the old column names to new names
Usage:
df.rename({'old_col_name': 'new_col_name', 'old_col_name2': 'new_col_name2'})
df.rename(old_col_name=new_col_name)
'''
for before, after in mapper.items():
df = df.withColumnRenamed(before, after)
for before, after in kwargs_mapper.items():
df = df.withColumnRenamed(before, after)
return df
pyspark.sql.dataframe.DataFrame.rename = rename_sdf

现在您可以轻松地重命名任何火花数据框在熊猫的方式!

df.rename({'old1':'new1', 'old2':'new2'})

接受的答案是由零323是有效的。其他大多数答案应该避免。

下面是另一个利用 Quinn库的高效解决方案,它非常适合生产代码库:

df = spark.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
def rename_col(s):
mapping = {'x1': 'x3', 'x2': 'x4'}
return mapping[s]
actual_df = df.transform(quinn.with_columns_renamed(rename_col))
actual_df.show()

下面是输出的数据帧:

+---+---+
| x3| x4|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

让我们看看 actual_df.explain(True)输出的逻辑计划,并验证它们是否有效:

== Parsed Logical Plan ==
'Project ['x1 AS x3#52, 'x2 AS x4#53]
+- LogicalRDD [x1#48L, x2#49L], false


== Analyzed Logical Plan ==
x3: bigint, x4: bigint
Project [x1#48L AS x3#52L, x2#49L AS x4#53L]
+- LogicalRDD [x1#48L, x2#49L], false


== Optimized Logical Plan ==
Project [x1#48L AS x3#52L, x2#49L AS x4#53L]
+- LogicalRDD [x1#48L, x2#49L], false


== Physical Plan ==
*(1) Project [x1#48L AS x3#52L, x2#49L AS x4#53L]

解析后的逻辑计划和物理计划基本上是相等的,因此 Catalyst 没有做任何繁重的工作来优化计划。

应该避免多次调用 withColumnRenamed,因为它会创建需要优化的低效解析计划。

让我们来看一个不必要的复杂解析计划:

def rename_columns(df, columns):
for old_name, new_name in columns.items():
df = df.withColumnRenamed(old_name, new_name)
return df


def rename_col(s):
mapping = {'x1': 'x3', 'x2': 'x4'}
return mapping[s]
actual_df = rename_columns(df, {'x1': 'x3', 'x2': 'x4'})
actual_df.explain(True)
== Parsed Logical Plan ==
Project [x3#52L, x2#49L AS x4#55L]
+- Project [x1#48L AS x3#52L, x2#49L]
+- LogicalRDD [x1#48L, x2#49L], false


== Analyzed Logical Plan ==
x3: bigint, x4: bigint
Project [x3#52L, x2#49L AS x4#55L]
+- Project [x1#48L AS x3#52L, x2#49L]
+- LogicalRDD [x1#48L, x2#49L], false


== Optimized Logical Plan ==
Project [x1#48L AS x3#52L, x2#49L AS x4#55L]
+- LogicalRDD [x1#48L, x2#49L], false


== Physical Plan ==
*(1) Project [x1#48L AS x3#52L, x2#49L AS x4#55L]

还可以使用 字典迭代要重命名的列。

样本

a_dict = {'sum_gb': 'sum_mbUsed', 'number_call': 'sum_call_date'}


for key, value in a_dict.items():
df= df.withColumnRenamed(value,key)

你应使用以下功能:

def spark_rename_from_dict(df, rename_dict):
newcols = [rename_dict.get(i,i) for i in df.columns]
df = df.toDF(*newcols)

在这里,您的重命名 dict 是对 df.columns子集的映射。建议使用这种方法,因为它不会创建多个数据框架