如何在数据集中存储自定义对象?

根据 介绍 Spark 数据集:

当我们期待 Spark 2.0时,我们计划对数据集进行一些令人兴奋的改进,具体如下: ... 自定义编码器-虽然我们目前自动生成各种类型的编码器,我们希望为自定义对象打开一个 API。

并试图在 Dataset中存储自定义类型会导致以下错误:

无法找到存储在数据集中的类型的编码器。基本类型(Int、 String 等)和 Product 类型(case 类)都可以通过导入 sqlContext. 得到支持。_ 支持序列化其他类型将在以后的版本中添加

或:

异常: 没有找到... 的编码器。

现在有什么解决办法吗?


请注意,这个问题只作为 Community Wiki 答案的入口点存在。请随时更新/改进问题和答案。

92984 次浏览
  1. 使用通用编码器。

    目前有两种通用编码器 kryojavaSerialization,后者被明确描述为:

    效率极低,只能作为最后的手段使用。

    假设下面的类

    class Bar(i: Int) {
    override def toString = s"bar $i"
    def bar = i
    }
    

    您可以通过添加隐式编码器来使用这些编码器:

    object BarEncoders {
    implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] =
    org.apache.spark.sql.Encoders.kryo[Bar]
    }
    

    可以一起使用如下:

    object Main {
    def main(args: Array[String]) {
    val sc = new SparkContext("local",  "test", new SparkConf())
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    import BarEncoders._
    
    
    val ds = Seq(new Bar(1)).toDS
    ds.show
    
    
    sc.stop()
    }
    }
    

    它将对象存储为 binary列,因此当转换为 DataFrame时,您将得到以下模式:

    root
    |-- value: binary (nullable = true)
    

    也可以使用 kryo编码器对特定字段的元组进行编码:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    

    请注意,我们这里不依赖于隐式编码器,而是显式地传递编码器,所以这很可能不适用于 toDS方法。

  2. 使用隐式转换:

    在可编码的表示和自定义类之间提供隐式转换,例如:

    object BarConversions {
    implicit def toInt(bar: Bar): Int = bar.bar
    implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    
    object Main {
    def main(args: Array[String]) {
    val sc = new SparkContext("local",  "test", new SparkConf())
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    import BarConversions._
    
    
    type EncodedBar = Int
    
    
    val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
    val barsDS = bars.toDS
    
    
    barsDS.show
    barsDS.map(_.bar).show
    
    
    sc.stop()
    }
    }
    

Related questions:

编码器工作在 Spark2.0或多或少相同。和 Kryo仍然是推荐的 serialization的选择。

你可以看下面的例子与火花壳

scala> import spark.implicits._
import spark.implicits._


scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders


scala> case class NormalPerson(name: String, age: Int) {
|   def aboutMe = s"I am ${name}. I am ${age} years old."
| }
defined class NormalPerson


scala> case class ReversePerson(name: Int, age: String) {
|   def aboutMe = s"I am ${name}. I am ${age} years old."
| }
defined class ReversePerson


scala> val normalPersons = Seq(
|   NormalPerson("Superman", 25),
|   NormalPerson("Spiderman", 17),
|   NormalPerson("Ironman", 29)
| )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))


scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]


scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]


scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+


scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+


scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.


scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]


scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.

到目前为止,在目前的范围内没有 appropriate encoders,所以我们的人没有编码为 binary值。但是,一旦我们使用 Kryo序列化提供一些 implicit编码器,这种情况就会改变。

// Provide Encoders


scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]


scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]


// Ecoders will be used since they are now present in Scope


scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]


scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]


// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+


scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+


// Our instances still work as expected


scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.


scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.

更新

这个答案仍然是有效的和信息丰富的,尽管自2.2/2.3以来情况有所改善,它增加了对 SetSeqMapDateTimestampBigDecimal的内置编码器支持。如果您坚持只使用 case 类和通常的 Scala 类型来创建类型,那么只使用 SQLImplicits中的隐式类型应该没有问题。


不幸的是,实际上没有添加任何东西来帮助解决这个问题。在 Encoders.scalaSQLImplicits.scala中搜索 @since 2.0.0可以找到主要与基本类型有关的内容(以及对 case 类的一些调整)。那么,首先要说的是: 目前对自定义类编码器还没有很好的支持。除去这些,接下来是一些技巧,考虑到我们目前可以支配的东西,这些技巧可以做到我们所希望的那样好。作为一个前期免责声明: 这不会完美的工作,我会尽我所能,使所有的限制明确和前期。

到底是什么问题

当你想创建一个数据集时,Spark“需要一个编码器(将一个 T 类型的 JVM 对象转换成内部 Spark SQL 表示) ,这个编码器通常是通过 SparkSession的隐式创建的,或者可以通过调用 Encoders上的静态方法来显式创建”(取自 createDataset上的文件)。编码器将采用 Encoder[T]的形式,其中 T是您正在编码的类型。第一个建议是添加 import spark.implicits._(它提供了 这些隐式编码器) ,第二个建议是使用与编码器相关的 这个函数集显式传递隐式编码器。

没有可用于常规类的编码器,所以

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

会显示以下与编译时间有关的隐式错误:

无法找到存储在数据集中的类型的编码器。基本类型(Int、 String 等)和 Product 类型(case 类)都可以通过导入 sqlContext. 得到支持。_ 支持序列化其他类型将在以后的版本中添加

但是,如果在某个扩展了 Product的类中包装刚才用来获得上述错误的类型,错误就会延迟到运行时,所以呢

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

编译正常,但在运行时失败

异常: 没有找到 MyObj 的编码器

原因是 Spark 创建的隐式编码器实际上只是在运行时(通过 scala 反射)创建的。在这种情况下,所有 Spark 在编译时检查的都是最外层的类扩展了 Product(所有 case 类都是这样做的) ,并且只有在运行时才意识到它仍然不知道如何处理 MyObj(如果我尝试让一个 Dataset[(Int,MyObj)]-Spark 等到运行时才在 MyObj上吐出来,也会发生同样的问题)。这些是迫切需要解决的核心问题:

  • 一些扩展 Product编译的类尽管在运行时总是崩溃,并且
  • 没有办法为嵌套类型传递自定义编码器(我没有办法为 Spark 提供一个只有 MyObj的编码器,这样它就知道如何编码 Wrap[MyObj](Int,MyObj))。

kryo

大家建议的解决方案是使用 kryo编码器。

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

不过很快就会变得很无聊。特别是当你的代码正在处理各种各样的数据集,加入,分组等。你最终会得到一堆额外的暗示。那么,为什么不直接使用一个隐式函数来自动完成这些操作呢?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)

现在,我似乎可以做几乎任何我想做的事情(下面的例子不会在 spark-shell中工作,其中 spark.implicits._是自动导入的)

class MyObj(val i: Int)


val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

或者差不多。问题是,使用 kryo导致 Spark 只是将数据集中的每一行存储为一个平面二进制对象。对于 mapfilterforeach这就足够了,但是对于像 join这样的操作,Spark 确实需要将它们分成列。检查 d2d3的模式,您会发现只有一个二进制列:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

元组的部分解

因此,使用 Scala 中的隐式魔法(6.26.3超载分辨率中的更多内容) ,我可以为自己创建一系列的隐式,它们将尽可能地完成工作,至少对于元组来说,并且能够很好地处理现有的隐式:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits


implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)


implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)


implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)


// ... you can keep making these

然后,利用这些隐含的内容,我可以使上面的示例起作用,尽管需要对列进行一些重命名

class MyObj(val i: Int)


val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

我还没有弄明白如何获得预期的元组名称(_1_2,...)默认情况下不重命名它们-如果其他人想玩这个,这个是名称 "value"被引入的地方,而 这个是通常添加元组名称的地方。然而,关键在于我现在有了一个很好的结构化模式:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

所以,总的来说,这个解决办法:

  • 允许我们为元组获取单独的列(这样我们就可以再次加入元组,耶!)
  • 我们可以再次仅仅依赖于隐式函数(所以不需要到处传递 kryo)
  • 几乎完全向后兼容 import spark.implicits._(包括一些重命名)
  • 没有是否允许我们加入到 kyro序列化的二进制列中,更不用说那些可能具有
  • 将一些元组列重命名为“ value”(如果有必要,可以通过转换 .toDF、指定新的列名以及转换回数据集来撤消这个命名) ,而且模式名似乎通过连接保留,这是最需要它们的地方)。

一般类的部分解

这个问题不那么令人愉快,也没有好的解决办法。但是,现在我们已经有了上面的 tuple 解决方案,我有一个预感,从另一个答案隐式转换解决方案也会少一点痛苦,因为您可以将更复杂的类转换为 tuple。然后,在创建数据集之后,您可能会使用数据框架方法重命名列。如果一切顺利,这是 真的的一个改进,因为我现在可以在类的字段上执行连接了。如果我只使用一个平面二进制 kryo序列化程序,这是不可能的。

下面是一个实现了一些所有功能的示例: 我有一个类 MyObj,它具有类型为 Intjava.util.UUIDSet[String]的字段。第一个会自己照顾自己。第二种,如果存储为 String,那么使用 kryo进行序列化会更有用(因为 UUID通常是我想要加入的对象)。第三个实际上只属于二进制列。

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])


// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])


// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

现在,我可以使用这个机器创建一个带有漂亮模式的数据集:

val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

这个模式向我展示了具有正确名称的列,以及前两个我可以加入的东西。

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)

对于 JavaBean 类,这可能很有用

import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])

现在您可以简单地将 dataFrame 读取为自定义 DataFrame

dataFrame.as[MyClass]

这将创建一个自定义类编码器,而不是二进制编码器。

我的示例将使用 Java,但我不认为适应 Scala 会很困难。

只要 Fruit是一个简单的 爪哇豆,我已经非常成功地使用 火花,创建数据集编码器,豆子RDD<Fruit>转换为 Dataset<Fruit>

步骤1: 创建简单的 JavaBean。

public class Fruit implements Serializable {
private String name  = "default-fruit";
private String color = "default-color";


// AllArgsConstructor
public Fruit(String name, String color) {
this.name  = name;
this.color = color;
}


// NoArgsConstructor
public Fruit() {
this("default-fruit", "default-color");
}


// ...create getters and setters for above fields
// you figure it out
}

我会坚持使用原始类型和字符串作为字段的类,以免 DataBricks 的人员加强他们的 Encoders。如果您有一个具有嵌套对象的类,那么创建另一个简单的 JavaBean,它的所有字段都是扁平的,这样您就可以使用 RDD 转换将复杂类型映射到更简单的类型。当然这是一个小小的额外工作,但是我想它会对使用扁平模式的性能有很大的帮助。

步骤2: 从 RDD 获取数据集

SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();


List<Fruit> fruitList = ImmutableList.of(
new Fruit("apple", "red"),
new Fruit("orange", "orange"),
new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);




RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);

好了,泡沫,冲洗,重复。

对于那些可能在我的情况下,我把我的答案也放在这里。

具体来说,

  1. 我从 SQLContext 中读到“设置类型化数据”,所以原始数据格式是 DataFrame。

    Val sample = spot.sqlContext.sql (“ select 1 as a,Collection _ set (1) as b limit 1”) Show ()

    +---+---+ 我不知道你在说什么 +---+---+ 我不知道你在说什么 +---+---+

  2. 然后使用 RDD.map ()和 mutable. WrappedArray 类型将其转换为 RDD。

    样本 . rd.map (r = > (r.getInt (0) ,r.getAs [ mutable. WrappedArray [ Int ]](1) . toSet)) 。收集() . foreach (println)

    结果:

    (1,Set(1))

您可以使用 UDtRegistry,然后使用 Case 类、 Tuples 等等,所有这些都可以正确地与您的用户定义类型一起工作!

假设您想使用一个自定义 Enum:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}

这样登记:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
override def sqlType: DataType = org.apache.spark.sql.types.StringType
override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
// Note that this will be a UTF8String type
override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}


// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)

那就用啊!

case class UsingCustomEnum(id:Int, en:CustomEnum)


val seq = Seq(
UsingCustomEnum(1, Foo),
UsingCustomEnum(2, Bar),
UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())

假设你想使用一个多态记录:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

然后像这样使用它:

case class UsingPoly(id:Int, poly:CustomPoly)


Seq(
UsingPoly(1, new FooPoly(1)),
UsingPoly(2, new BarPoly("Blah", 123)),
UsingPoly(3, new FooPoly(1))
).toDS


polySeq.filter(_.poly match {
case FooPoly(value) => value == 1
case _ => false
}).show()

您可以编写一个将所有内容都编码为字节的自定义 UDT (我在这里使用的是 Java 序列化,但是最好使用 Spark 的 Kryo 上下文)。

首先定义 UDT 类:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
val kryo = new Kryo()


override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
override def serialize(obj: CustomPoly): Any = {
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(obj)


bos.toByteArray
}
override def deserialize(datum: Any): CustomPoly = {
val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
val ois = new ObjectInputStream(bis)
val obj = ois.readObject()
obj.asInstanceOf[CustomPoly]
}


override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

然后登记:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

然后你就可以使用它了!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)


Seq(
UsingPoly(1, new FooPoly(1)),
UsingPoly(2, new BarPoly("Blah", 123)),
UsingPoly(3, new FooPoly(1))
).toDS


polySeq.filter(_.poly match {
case FooPoly(value) => value == 1
case _ => false
}).show()

除了已经给出的建议之外,我最近发现的另一个选项是可以声明包含 trait org.apache.spark.sql.catalyst.DefinedByConstructorParams的自定义类。

如果类的构造函数使用 ExpressionEncoder 可以理解的类型,即基本值和标准集合,那么这种方法就可以工作。当你不能将类声明为 case 类,但是不想每次将它包含在数据集中时都使用 Kryo 来编码它时,它就会派上用场。

例如,我想声明一个包含 Breeze 向量的 case 类。唯一能够处理这个问题的编码器通常是 Kryo。但是如果我声明一个子类来扩展 Breeze DenseVector 和 Definition edByConstruction torParams,ExpressionEncoder 就会理解它可以被序列化为一个 Doubles 数组。

我是这么说的:

class SerializableDenseVector(values: Array[Double]) extends breeze.linalg.DenseVector[Double](values) with DefinedByConstructorParams
implicit def BreezeVectorToSerializable(bv: breeze.linalg.DenseVector[Double]): SerializableDenseVector = bv.asInstanceOf[SerializableDenseVector]

现在,我可以使用一个简单的 ExpressionEncoder 在数据集中使用 SerializableDenseVector(直接使用,或作为 Product 的一部分使用) ,而不需要使用 Kryo。它的工作方式与 Breeze DenseVector 类似,但序列化为 Array [ Double ]。

@ Alec 的回答很棒! 只是在他/她的回答中添加一个评论:

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

@ Alec 提到:

没有办法为嵌套类型传递自定义编码器(我没有办法为 Spark 提供仅 MyObj 的编码器,这样它就知道如何编码 Wrap [ MyObj ]或(Int,MyObj))。

看起来是这样,因为如果我为 MyObj添加一个编码器:

implicit val myEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]

但它仍然失败了:

java.lang.UnsupportedOperationException: No Encoder found for MyObj
- field (class: "MyObj", name: "unwrap")
- root class: "Wrap"
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:643)

但请注意重要的错误信息:

根类: “ Wrap”

它实际上提示编码 MyObj是不够的,您必须编码包括 Wrap[T]在内的 整个链条

因此,如果我这样做,它 解决问题:

implicit val myWrapperEncoder = org.apache.spark.sql.Encoders.kryo[Wrap[MyObj]]

因此,@Alec 的评论是 没有,这是正确的:

我没有办法为 Spark 提供一个只针对 MyObj 的编码器,这样它就知道如何编码 Wrap [ MyObj ]或(Int,MyObj)

我们仍然使用 有办法为 MyObj提供 Spark 的编码器,这样它就知道如何编码 Wrap [ MyObj ]或(Int,MyObj)。