如何在 Spark SQL 的 DataFrame 中更改列类型?

假设我这样做:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()


root
|-- year: string (nullable = true)
|-- make: string (nullable = true)
|-- model: string (nullable = true)
|-- comment: string (nullable = true)
|-- blank: string (nullable = true)


df.show()
year make  model comment              blank
2012 Tesla S     No comment
1997 Ford  E350  Go get one now th...

但我真的希望 year作为 Int(也许转换一些其他列)。

我能想到的最好的办法就是

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

这有点复杂。

我来自 R,我习惯于写作,例如。

df2 <- df %>%
mutate(year = year %>% as.integer,
make = make %>% toupper)

我可能遗漏了一些东西,因为在 Spark/Scala 中应该有更好的方法来实现这一点... ..。

544165 次浏览

2016年3月: 感谢投票!虽然这不是最好的解决方案,但是我认为基于 msemelman,Martin Senne 等人提出的 withColumnwithColumnRenamedcast的解决方案更简单、更清晰。

我认为您的方法是可以的,回想一下 Spark DataFrame是一个(不可变的) RDD 的行,所以我们从来没有真正的 替换一列,只是创建新的 DataFrame每次与一个新的模式。

假设您有一个具有以下模式的原始 df:

scala> df.printSchema
root
|-- Year: string (nullable = true)
|-- Month: string (nullable = true)
|-- DayofMonth: string (nullable = true)
|-- DayOfWeek: string (nullable = true)
|-- DepDelay: string (nullable = true)
|-- Distance: string (nullable = true)
|-- CRSDepTime: string (nullable = true)

一些 UDF 定义在一列或多列上:

import org.apache.spark.sql.functions._


val toInt    = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt )
val days_since_nearest_holidays = udf(
(year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
)

更改列类型,甚至从另一个列构建新的 DataFrame,可以这样编写:

val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour",  toHour(df("CRSDepTime")))
.withColumn("dayOfWeek",      toInt(df("DayOfWeek")))
.withColumn("dayOfMonth",     toInt(df("DayofMonth")))
.withColumn("month",          toInt(df("Month")))
.withColumn("distance",       toDouble(df("Distance")))
.withColumn("nearestHoliday", days_since_nearest_holidays(
df("Year"), df("Month"), df("DayofMonth"))
)
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth",
"month", "distance", "nearestHoliday")

结果是:

scala> df.printSchema
root
|-- departureDelay: double (nullable = true)
|-- departureHour: integer (nullable = true)
|-- dayOfWeek: integer (nullable = true)
|-- dayOfMonth: integer (nullable = true)
|-- month: integer (nullable = true)
|-- distance: double (nullable = true)
|-- nearestHoliday: integer (nullable = true)

这很接近你自己的解决方案。简单地说,将类型更改和其他转换保持为单独的 udf val可以使代码更具可读性和可重用性。

你可以使用 selectExpr使它更干净一点:

df.selectExpr("cast(year as int) as year", "upper(make) as make",
"model", "comment", "blank")

要将年份从 string 转换为 int,可以向 csv 阅读器添加以下选项: “ effesSchema”-> “ true”,参见 DataBricks 文档

首先 ,如果您想要强制类型转换,那么这样:

import org.apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))

使用相同的列名时,该列将被替换为新的列。您不需要执行添加和删除步骤。

第二个 ,关于 斯卡拉R
这是与 R I 最相似的代码:

val df2 = df.select(
df.columns.map {
case year @ "year" => df(year).cast(IntegerType).as(year)
case make @ "make" => functions.upper(df(make)).as(make)
case other         => df(other)
}: _*
)

虽然代码长度比 R 长一点。这与语言的冗长无关。在 R 中,mutate是 R 数据帧的一个特殊函数,而在 Scala 中,由于其表达能力,你可以很容易地对其进行特殊处理。
总而言之,它避免了特定的解决方案,因为语言设计足够好,可以快速简单地构建自己的领域语言。


旁注: 令人惊讶的是,df.columnsArray[String]而不是 Array[Column],也许他们想让它看起来像 Python 熊猫的数据帧。

由于 cast操作可用于 Spark Column的操作(而且我个人并不支持@Svend提出的 udf操作) ,不如:

df.select( df("year").cast(IntegerType).as("year"), ... )

转换成所要求的类型?作为一个整洁的副作用,值不浇铸/“可转换”在这个意义上,将成为 null

如果你需要这个作为 辅助方法,使用:

object DFHelper{
def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
df.withColumn( cn, df(cn).cast(tpe) )
}
}

用法如下:

import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )

编辑: 最新的最新版本

因为在使用 Scala 的时候,你应该使用数据集 api [1] ,点击这里查看文档:

Https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/dataset.html#withcolumn (colName: String,col.apache.spot.sql. Column) : org.apache.spot.sql. DataFrame

如果使用 python,尽管更容易,我还是把这个链接留在这里,因为这是一个经过高度评价的问题:

Https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql

>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name='Alice', age2=4), Row(age=5, name='Bob', age2=7)]

[1] https://spark.apache.org/docs/latest/sql-programming-guide.html:

在 Scala API 中,DataFrame 只是 Dataset [ Row ]的一个类型别名。 而在 JavaAPI 中,用户需要使用数据集来表示 数据框架。

编辑: 最新版本

由于火花2. x 你可以使用 .withColumn。检查这里的文档:

Https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql : dataset@withColumn (colName: String,col.apache.parks.sql. Column) : org.apache.parks.sql. DataFrame

最老套的答案

由于 Spark 版本1.4,您可以在列上应用带有 DataType 的 cast 方法:

import org.apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
.drop("year")
.withColumnRenamed("yearTmp", "year")

如果使用 sql 表达式,还可以:

val df2 = df.selectExpr("cast(year as int) year",
"make",
"model",
"comment",
"blank")

更多信息请查看文档: Http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql 数据框架

因此,只有在保存到 sqlserver 这样的 jdbc 驱动程序时出现问题时,这种方法才能真正起作用,但是对于在语法和类型方面遇到的错误,这种方法真的很有帮助。

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")


override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
//      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
}
}


JdbcDialects.registerDialect(SQLServerDialect)

用于将 DataFrame 的数据类型从 String 修改为 Integer 的 Java 代码

df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))

它将简单地将现有的(String 数据类型)强制转换为 Integer。

建议使用铸造的答案,仅供参考,火花1.4.1中的铸造方法是错误的。

例如,一个字符串列的值为“8182175552014127960”的数据框在转换为 bigint 时的值为“818217552014128100”

    df.show
+-------------------+
|                  a|
+-------------------+
|8182175552014127960|
+-------------------+


df.selectExpr("cast(a as bigint) a").show
+-------------------+
|                  a|
+-------------------+
|8182175552014128100|
+-------------------+

在发现这个 bug 之前,我们必须面对很多问题,因为我们在生产环境中使用了 bigint 列。

df.select($"long_col".cast(IntegerType).as("int_col"))
    val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd
//Schema to be applied to the table
val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType)


val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates()

可以通过使用 Sparksql 中的强制转换来更改列的数据类型。 Table name 是 table,它有两列,只有 column1和 column2以及 Column1数据类型需要更改。 Sql (“ select cast (column1 as Double) column n1NewName,column n2 from table”) 代替双写数据类型。

此方法将删除旧列并创建具有相同值和新数据类型的新列。创建 DataFrame 时,我的原始数据类型是:-

root
|-- id: integer (nullable = true)
|-- flag1: string (nullable = true)
|-- flag2: string (nullable = true)
|-- name: string (nullable = true)
|-- flag3: string (nullable = true)

在此之后,我运行以下代码来更改数据类型:-

df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)

在这之后,我的结果是:-

root
|-- id: integer (nullable = true)
|-- flag2: string (nullable = true)
|-- name: string (nullable = true)
|-- flag1: boolean (nullable = true)
|-- flag3: boolean (nullable = true)

您可以使用以下代码。

df.withColumn("year", df("year").cast(IntegerType))

列转换为 IntegerType列。

生成一个包含5个值的简单数据集,并将 int转换为 string类型:

val df = spark.range(5).select( col("id").cast("string") )

另一种方式:

// Generate a simple dataset containing five values and convert int to string type


val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")

使用 Spark Sql2.4.0可以做到这一点:

spark.sql("SELECT STRING(NULLIF(column,'')) as column_string")

如果您必须根据列名重命名几十个列,那么下面的示例采用@dnlbrky 方法,并将其同时应用于多个列:

df.selectExpr(df.columns.map(cn => {
if (Set("speed", "weight", "height").contains(cn)) s"cast($cn as double) as $cn"
else if (Set("isActive", "hasDevice").contains(cn)) s"cast($cn as boolean) as $cn"
else cn
}):_*)

未转换的列保持不变。所有列保持原来的顺序。

另一种解决办法如下:

1)保持“推理模式”为假

2)在行上运行‘ Map’函数时,可以读取‘ asString’(row.getString...)

//Read CSV and create dataset
Dataset<Row> enginesDataSet = sparkSession
.read()
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema","false")
.load(args[0]);


JavaRDD<Box> vertices = enginesDataSet
.select("BOX","BOX_CD")
.toJavaRDD()
.map(new Function<Row, Box>() {
@Override
public Box call(Row row) throws Exception {
return new Box((String)row.getString(0),(String)row.get(1));
}
});

我觉得这对我来说更容易理解。

import org.apache.spark.sql.types._
df.withColumn("year", df("year").cast(IntegerType))

这将创建任何临时列并删除这些列,从而将年份列转换为 IntegerType。 如果要转换为其他数据类型,可以检查 org.apache.spark.sql.types包中的类型。

答案太多,解释太少

以下语法适用于使用带 Spark 2.4的 Databricks 笔记本

from pyspark.sql.functions import *
df = df.withColumn("COL_NAME", to_date(BLDFm["LOAD_DATE"], "MM-dd-yyyy"))

注意,您必须指定条目格式(在我的示例中为“ MM-dd-yyyy”) ,并且导入是强制的,因为 to _ date 是一个 park sql 函数

还尝试了这种语法,但得到的是 null 而不是正确的强制转换:

df = df.withColumn("COL_NAME", df["COL_NAME"].cast("Date"))

(注意,我必须使用括号和引号才能保证语法正确) < br > < br >
PS: < em > 我不得不承认这就像一个语法丛林,入口点有很多可能的方式,官方的 API 引用缺乏适当的例子。

如果希望将特定类型的多个列更改为另一个类型而不指定单个列名,则使用

/* Get names of all columns that you want to change type.
In this example I want to change all columns of type Array to String*/
val arrColsNames = originalDataFrame.schema.fields.filter(f => f.dataType.isInstanceOf[ArrayType]).map(_.name)


//iterate columns you want to change type and cast to the required type
val updatedDataFrame = arrColsNames.foldLeft(originalDataFrame){(tempDF, colName) => tempDF.withColumn(colName, tempDF.col(colName).cast(DataTypes.StringType))}


//display


updatedDataFrame.show(truncate = false)

为什么不按照 http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast的描述做呢

df.select(df.year.cast("int"),"make","model","comment","blank")