因此,我花了很多年在面向对象的世界中,代码重用、设计模式和最佳实践总是被考虑在内,我发现自己在 Spark 的世界中有些挣扎于代码组织和代码重用。
如果我尝试以可重用的方式编写代码,几乎总是伴随着性能成本,最终我会将其重写为对我的特定用例来说最优的代码。这种不变的“为特定用例编写最优的代码”也会影响代码的组织,因为当“所有代码真正属于一起”时,将代码分割成不同的对象或模块是很困难的,因此我最终只能得到极少数包含长长的复杂转换链的“上帝”对象。事实上,我经常想,如果我回顾一下我在面向对象世界工作时所写的大部分 Spark 代码,我会畏缩不前,把它当作“意大利面条代码”而不屑一顾。
我上网搜索,试图找到一些类似于面向对象世界的最佳实践的东西,但没有多少运气。我可以找到一些函数式编程的“最佳实践”,但 Spark 只是添加了一个额外的层,因为性能是这里的一个主要因素。
所以我的问题是,你们当中有没有人找到了编写 Spark 代码的最佳实践,可以推荐给我们?
剪辑
正如我在评论中所写的,我并没有期望任何人就如何使用 解决解决这个问题发布一个答案,而是希望这个社区中的某个人能遇到一些 Martin Fowler 类型的人,他在某个地方写了一些关于如何解决 Spark 世界中代码组织问题的文章或博客文章。
@ DanielDarabos 建议我举一个代码组织和性能冲突的例子。虽然我发现我在日常工作中经常遇到这个问题,但是我发现很难把它归结为一个很好的最小的例子;)但是我会尝试。
在面向对象的世界里,我是单一责任原则的忠实拥护者,所以我会确保我的方法只负责一件事情。这使得它们可重用且易于测试。因此,如果我必须计算一个列表中某些数字的和(匹配某些标准) ,并且必须计算相同数字的平均值,我肯定会创建两个方法——一个计算总和,另一个计算平均值。像这样:
def main(implicit args: Array[String]): Unit = {
val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))
println("Summed weights for DK = " + summedWeights(list, "DK")
println("Averaged weights for DK = " + averagedWeights(list, "DK")
}
def summedWeights(list: List, country: String): Double = {
list.filter(_._1 == country).map(_._2).sum
}
def averagedWeights(list: List, country: String): Double = {
val filteredByCountry = list.filter(_._1 == country)
filteredByCountry.map(_._2).sum/ filteredByCountry.length
}
我当然可以继续履行 Spark 的 SRP:
def main(implicit args: Array[String]): Unit = {
val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight")
println("Summed weights for DK = " + summedWeights(df, "DK")
println("Averaged weights for DK = " + averagedWeights(df, "DK")
}
def avgWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
import org.apache.spark.sql.functions._
import sqlContext.implicits._
val countrySpecific = df.filter('country === country)
val summedWeight = countrySpecific.agg(avg('weight))
summedWeight.first().getDouble(0)
}
def summedWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
import org.apache.spark.sql.functions._
import sqlContext.implicits._
val countrySpecific = df.filter('country === country)
val summedWeight = countrySpecific.agg(sum('weight))
summedWeight.first().getDouble(0)
}
但是因为我的 df
可能包含数十亿行,我宁愿不执行 filter
两次。事实上,性能直接与 EMR 成本挂钩,所以我真的不希望这样。为了克服这个问题,我决定违反 SRP,简单地将两个功能合二为一,并确保我呼吁坚持使用国家过滤的 DataFrame
,像这样:
def summedAndAveragedWeights(df: DataFrame, country: String, sqlContext: SQLContext): (Double, Double) = {
import org.apache.spark.sql.functions._
import sqlContext.implicits._
val countrySpecific = df.filter('country === country).persist(StorageLevel.MEMORY_AND_DISK_SER)
val summedWeights = countrySpecific.agg(sum('weight)).first().getDouble(0)
val averagedWeights = summedWeights / countrySpecific.count()
(summedWeights, averagedWeights)
}
现在,这个例子当然是对现实生活中遇到的问题进行了大量的简化。在这里,我可以简单地通过过滤和持久化 df
之前将它交给 sum 和 avg 函数(这也将是更多的 SRP)来解决它,但在现实生活中,可能需要一次又一次地进行大量中间计算。换句话说,这里的 filter
函数只是试图创建一个 很简单示例,这个示例将从持久化中受益。事实上,我认为对 persist
的调用在这里是一个关键字。调用 persist
将极大地加快我的工作,但代价是我必须紧密耦合所有依赖于持久化 DataFrame
的代码-即使它们在逻辑上是独立的。