如何在 PySpark 中将数据框列从 String 类型更改为 Double 类型?

我有一个数据框,列为 String。 我想在 PySpark 中将列类型更改为 Double 类型。

我是这样做的:

toDoublefunc = UserDefinedFunction(lambda x: x,DoubleType())
changedTypedf = joindf.withColumn("label",toDoublefunc(joindf['show']))

只是想知道,这是正确的方式做到这一点,因为在跑步 通过 Logit模型,我得到了一些错误,所以我想知道, 这就是麻烦的原因吗。

365109 次浏览

解决办法很简单

toDoublefunc = UserDefinedFunction(lambda x: float(x),DoubleType())
changedTypedf = joindf.withColumn("label",toDoublefunc(joindf['show']))

这里不需要 UDF。 Column已经为 cast方法提供了 DataType 例子:

from pyspark.sql.types import DoubleType


changedTypedf = joindf.withColumn("label", joindf["show"].cast(DoubleType()))

或短绳:

changedTypedf = joindf.withColumn("label", joindf["show"].cast("double"))

其中规范字符串名称(也可以支持其他变体)对应于 simpleString值:

from pyspark.sql import types


for t in ['BinaryType', 'BooleanType', 'ByteType', 'DateType',
'DecimalType', 'DoubleType', 'FloatType', 'IntegerType',
'LongType', 'ShortType', 'StringType', 'TimestampType']:
print(f"{t}: {getattr(types, t)().simpleString()}")
BinaryType: binary
BooleanType: boolean
ByteType: tinyint
DateType: date
DecimalType: decimal(10,0)
DoubleType: double
FloatType: float
IntegerType: int
LongType: bigint
ShortType: smallint
StringType: string
TimestampType: timestamp

例如复杂类型

types.ArrayType(types.IntegerType()).simpleString()
'array<int>'
types.MapType(types.StringType(), types.IntegerType()).simpleString()
'map<string,int>'

保留列的名称,并通过使用与输入列相同的名称来避免额外的列添加:

from pyspark.sql.types import DoubleType
changedTypedf = joindf.withColumn("show", joindf["show"].cast(DoubleType()))

给出的答案足以解决这个问题,但我想分享另一种方式,可能会引入新版本的 Spark (我不确定),所以给出的答案没有抓住它。

我们可以用 col("colum_name")关键字到达火花语句的列:

from pyspark.sql.functions import col
changedTypedf = joindf.withColumn("show", col("show").cast("double"))

PySpark 版本:

df = <source data>
df.printSchema()


from pyspark.sql.types import *


# Change column type
df_new = df.withColumn("myColumn", df["myColumn"].cast(IntegerType()))
df_new.printSchema()
df_new.select("myColumn").show()

其他答案的一个问题(取决于 Pypark 的版本)是 withColumn的使用。至少在 v2.4.4中已经观察到了性能问题(请参阅此 线)。火花博士提到了 withColumn:

这种方法在内部引入了投影。因此,多次调用它,例如,通过循环添加多个列可能会产生大的计划,这可能会导致性能问题,甚至 StackOverflow 异常。为了避免这种情况,请同时使用 select 和多个列。

一般而言,实现建议使用 select的一种方法是:

from pyspark.sql.types import *
from pyspark.sql import functions as F


cols_to_fix = ['show']
other_cols = [col for col in joindf.columns if not col in cols_to_fix]
joindf = joindf.select(
*other_cols,
F.col('show').cast(DoubleType())
)