如何将数组(即列表)列转换为 Vector

长话短说!

考虑下面的代码片段(假设 spark已经设置为某个 SparkSession) :

from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)

注意,温度字段是一个浮点数列表。我想把这些浮点数列表转换成 MLlib 类型的 Vector,我希望这个转换能用基本的 DataFrame API 来表示,而不是通过 RDD (这是低效的,因为它将所有数据从 JVM 发送到 Python,处理是在 Python 中完成的,我们没有得到 Spark 的 Catalyst 优化器的好处,等等)。我该怎么做?具体来说:

  1. 有什么办法能让直线石膏起作用吗?请参阅下面的详细信息(以及一次失败的变通方案尝试) ?还是说,还有其他手术能达到我想要的效果?
  2. 我在下面建议的两种替代解决方案中,哪一种更有效(UDF 与展开/重新组装列表中的项目) ?或者有没有其他几乎但并不完全正确的选择,比他们中的任何一个更好?

打石膏不管用

这就是我所期望的“适当”的解决方案。我想将列的类型从一种类型转换为另一种类型,所以我应该使用强制转换。作为背景介绍,让我提醒你们一下将其转换为另一种类型的正常方法:

from pyspark.sql import types
df_with_strings = df.select(
df["city"],
df["temperatures"].cast(types.ArrayType(types.StringType()))),
)

现在,例如 df_with_strings.collect()[0]["temperatures"][1]'-7.0',但是如果我把它转换成 ml Vector,事情就不那么顺利了:

from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))

这就产生了一个错误:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"

哎呀! 有什么办法解决这个问题吗?

可能的选择

备选方案1: 使用 VectorAssembler

有一个 Transformer,似乎几乎理想的这项工作: VectorAssembler。它接受一个或多个列,并将它们连接到一个向量中。不幸的是,它只采用 VectorFloat列,而不是 Array列,因此下面的代码不起作用:

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)

它给出了这样一个错误:

pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'

我能想到的最好的解决办法就是将列表分解成多个列,然后使用 VectorAssembler再次收集它们:

from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)],
outputCol="temperature_vector"
)
df_exploded = df.select(
df["city"],
*[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")

这似乎是理想的,除了 TEMPERATURE_COUNT超过100,有时超过1000。(另一个问题是,如果事先不知道数组的大小,代码会更加复杂,尽管我的数据并非如此。)Spark 是否实际上生成了一个包含这么多列的中间数据集,或者仅仅将其视为单个项目短暂通过的中间步骤(或者当它看到这些列的唯一用途是组装成一个向量时,它确实完全优化了这个离开步骤) ?

备选方案2: 使用 UDF

一个相当简单的替代方法是使用 UDF 进行转换。这使我能够用一行代码直接表达我想要做的事情,并且不需要用大量的列来创建数据集。但是所有这些数据都必须在 Python 和 JVM 之间交换,每个数字都必须由 Python 处理(对于在单个数据项上迭代而言,这是出了名的慢)。看起来是这样的:

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
df["city"],
list_to_vector_udf(df["temperatures"]).alias("temperatures")
)

不可理喻的言论

这个杂乱无章的问题的其余部分是我在试图找到答案时想到的一些额外的东西。大多数读这篇文章的人可能会忽略它们。

不是解决方案: 首先使用 Vector

在这个简单的示例中,可以首先使用向量类型创建数据,但是当然,我的数据并不是我要并行化的 Python 列表,而是从数据源读取的。但事先声明,这看起来是这样的:

from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)

低效的解决方案: 使用 map()

一种可能性是使用 RDD map()方法将列表转换为 Vector。这与 UDF 思想相似,只是它更糟糕,因为序列化等的成本是针对每一行中的所有字段产生的,而不仅仅是正在操作的字段。郑重声明,解决方案是这样的:

df_with_vectors = df.rdd.map(lambda row: Row(
city=row["city"],
temperatures=Vectors.dense(row["temperatures"])
)).toDF()

对强制转换的解决方案尝试失败

在绝望中,我注意到 Vector在内部由一个包含四个字段的结构表示,但是使用来自这种结构类型的传统强制转换也不起作用。下面是一个示例(我使用 udf 构建了 struct,但是 udf 不是重要部分) :

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
df["city"],
list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
df_almost_vector["city"],
df_almost_vector["temperatures"].cast(VectorUDT())
)

这就产生了错误:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"
36044 次浏览

就我个人而言,我会选择 Python UDF,而不会为其他任何事情操心:

但如果你真的想要其他选择,你可以这样做:

  • 使用 Python 包装器的 Scala UDF:

    按照项目站点上的说明安装 是的

    创建结构如下的 Scala 包:

    .
    ├── build.sbt
    └── udfs.scala
    

    编辑 build.sbt(调整以反映 Scala 和 Spark 版本) :

    scalaVersion := "2.11.8"
    
    
    libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-sql" % "2.4.4",
    "org.apache.spark" %% "spark-mllib" % "2.4.4"
    )
    

    编辑 udfs.scala:

    package com.example.spark.udfs
    
    
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.ml.linalg.DenseVector
    
    
    object udfs {
    val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
    }
    

    套餐:

    sbt package
    

    以及 include (或者根据 Scala 版本等效) :

    $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
    

    作为启动 shell/提交应用程序时 --driver-class-path的参数。

    在 PySpark 中定义一个包装器:

    from pyspark.sql.column import _to_java_column, _to_seq, Column
    from pyspark import SparkContext
    
    
    def as_vector(col):
    sc = SparkContext.getOrCreate()
    f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
    return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
    

    测试:

    with_vec = df.withColumn("vector", as_vector("temperatures"))
    with_vec.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|          vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    
    with_vec.printSchema()
    
    root
    |-- city: string (nullable = true)
    |-- temperatures: array (nullable = true)
    |    |-- element: double (containsNull = true)
    |-- vector: vector (nullable = true)
    
  • Dump data to a JSON format reflecting DenseVector schema and read it back:

    from pyspark.sql.functions import to_json, from_json, col, struct, lit
    from pyspark.sql.types import StructType, StructField
    from pyspark.ml.linalg import VectorUDT
    
    
    json_vec = to_json(struct(struct(
    lit(1).alias("type"),  # type 1 is dense, type 0 is sparse
    col("temperatures").alias("values")
    ).alias("v")))
    
    
    schema = StructType([StructField("v", VectorUDT())])
    
    
    with_parsed_vector = df.withColumn(
    "parsed_vector", from_json(json_vec, schema).getItem("v")
    )
    
    
    with_parsed_vector.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|   parsed_vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_parsed_vector.printSchema()
    
    root
    |-- city: string (nullable = true)
    |-- temperatures: array (nullable = true)
    |    |-- element: double (containsNull = true)
    |-- parsed_vector: vector (nullable = true)
    

我也遇到过和你一样的问题。 这种方法包括 RDD 转换,因此性能不是关键,但它是有效的。

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors


source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)


city_rdd = df.rdd.map(lambda row:row[0])
temp_rdd = df.rdd.map(lambda row:row[1])
new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures'])


new_df

结果就是,

DataFrame[city: string, temperatures: vector]

对于 pypark > = 3.1.0

自3.1.0以来,有一个构建解决方案: array _ to _ Vector。

考虑到你的情况:

from pyspark.ml.functions import vector_to_array
df = df.withColumn("temperatures_vectorized", vector_to_array("temperatures"))

另外,自3.0.0以来,还有一个相反的操作: Vector _ to _ array