Scala vs Python的Spark性能

比起Scala,我更喜欢Python。但是,由于Spark本身是用Scala编写的,我希望我的代码在Scala中比Python版本运行得更快,原因很明显。

带着这样的假设,我想学习&为1 GB的数据写一些非常常见的Scala版本的预处理代码。数据选自Kaggle上的SpringLeaf比赛。只是简单介绍一下数据(它包含1936个维度和145232行)。数据由各种类型组成,例如int型,浮点型,字符串,布尔型。我使用8个内核中的6个进行Spark处理;这就是为什么我使用minPartitions=6,这样每个核都有一些东西要处理。

Scala代码

val input = sc.textFile("train.csv", minPartitions=6)


val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"


def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")


for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}


val input3 = input2.flatMap(separateCols)


def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}


val input4 = input3.map(toKeyVal)


def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}


val input5 = input4.reduceByKey(valsConcat)


input5.saveAsTextFile("output")

Python代码

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'




def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr


input2 = input.mapPartitionsWithIndex(drop_first_line)


def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
for e, val in enumerate(vals)]
return vals2




input3 = input2.flatMap(separate_cols)


def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)


def vals_concat(v1, v2):
return v1 + ',' + v2


input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

< >强Scala的性能 第0阶段(38分钟),第1阶段(18秒) enter image description here < / p >

< >强Python的性能 第0阶段(11分钟),第1阶段(7秒) enter image description here < / p >

两者都生成了不同的DAG可视化图(由于这两张图显示了Scala (map)和Python (reduceByKey)的不同阶段0函数)

但是,本质上,这两个代码都试图将数据转换为(维度_id,值列表字符串)RDD并保存到磁盘。输出将用于计算每个维度的各种统计信息。

性能方面,对于这样的真实数据,Scala代码似乎比Python版本运行慢4倍。 对我来说,好消息是它给了我继续使用Python的良好动力。坏消息是我不太明白为什么?< / p >

59456 次浏览

讨论代码的原始答案可以在下面找到。


首先,您必须区分不同类型的API,每种API都有自己的性能考虑因素。

抽样API

(纯Python结构与基于JVM的编制)

这是受Python代码性能和PySpark实现细节影响最大的组件。虽然Python的性能不太可能成为问题,但至少有几个因素是你必须考虑的:

  • JVM通信的开销。实际上,所有进出Python executor的数据都必须通过套接字和JVM worker传递。虽然这是一种相对有效的本地通信,但它仍然不是免费的。
  • 基于进程的执行器(Python)与基于线程的执行器(单个JVM多线程)(Scala)。每个Python执行程序都在自己的进程中运行。作为一个副作用,它提供了比JVM更强的隔离和对执行程序生命周期的一些控制,但可能会显著提高内存使用:

    • 解释器内存占用
    • 所加载的库的占用空间
    • 低效率的广播(每个进程都需要自己的广播副本)
    • 李< / ul > < / >
    • Python代码本身的性能。一般来说,Scala比Python快,但它会因任务而异。此外,您有多个选项,包括JITs,如Numba, C扩展(Cython)或专门的库,如Theano。最后,如果你不使用ML / MLlib(或简单的NumPy堆栈),考虑使用PyPy作为替代解释器。参见SPARK-3094

    • PySpark配置提供了spark.python.worker.reuse选项,可用于选择为每个任务创建Python进程或重用现有进程。后一个选项似乎有助于避免昂贵的垃圾收集(它更像是一种印象,而不是系统测试的结果),而前一个选项(默认)是在昂贵的广播和导入情况下的最佳选择。
    • 引用计数在CPython中用作第一行垃圾收集方法,在典型的Spark工作负载(流处理,没有引用周期)下工作得非常好,并降低了长时间GC暂停的风险。

    MLlib

    (Python和JVM混合执行)

    基本的考虑因素与以前几乎相同,但有一些额外的问题。虽然MLlib使用的基本结构是普通的Python RDD对象,但所有算法都直接使用Scala执行。

    这意味着将Python对象转换为Scala对象或将Python对象转换为Scala对象会增加额外的成本,增加内存使用,以及稍后我们将介绍的一些额外限制。

    到目前为止(Spark 2.x),基于rdd的API处于维护模式,并且是计划在Spark 3.0中移除

    DataFrame API和Spark ML

    (使用Python代码的JVM执行仅限于驱动程序)

    这些可能是标准数据处理任务的最佳选择。由于Python代码主要局限于驱动程序上的高级逻辑操作,因此Python和Scala之间应该没有性能差异。

    唯一的例外是按行使用Python udf,它的效率明显低于它们的Scala对等物。虽然有一些改进的机会(在Spark 2.0.0中已经有了实质性的开发),但最大的限制是内部表示(JVM)和Python解释器之间的完全往返。如果可能,您应该支持内置的表达式(例子的组合。Python UDF行为在Spark 2.0.0中得到了改进,但与本机执行相比,它仍然不是最优的。

    随着向量化udf (SPARK-21190及其扩展)的引入,这个将来可以改进吗有了显著的改进,它使用箭头流进行有效的零拷贝反序列化数据交换。对于大多数应用程序,它们的次要开销可以忽略不计。

    还要确保避免在DataFramesRDDs之间传递不必要的数据。这需要昂贵的序列化和反序列化,更不用说与Python解释器之间的数据传输了。

    值得注意的是,Py4J调用具有相当高的延迟。这包括简单的调用,如:

    from pyspark.sql.functions import col
    
    
    col("foo")
    

    通常,这并不重要(开销是恒定的,不依赖于数据量),但对于软实时应用程序,您可以考虑缓存/重用Java包装器。

    GraphX和Spark数据集

    至于现在(Spark 1.6 2.1),两者都不提供PySpark API,所以你可以说PySpark比Scala差得多。

    GraphX

    实际上,GraphX的开发几乎完全停止了,项目目前处于相关JIRA门票关闭为不会修复的维护模式。GraphFrames库提供了一个带有Python绑定的备选图形处理库。

    数据集

    从主观上讲,Python中静态类型Datasets的位置并不多,即使有,当前的Scala实现也太简单了,不能提供与DataFrame相同的性能优势。

    流媒体

    从我目前看到的情况来看,我强烈建议使用Scala而不是Python。如果PySpark支持结构化流,未来可能会有所改变,但现在Scala API似乎更健壮、更全面、更高效。我的经验很有限。

    Spark 2中的结构化流。X似乎缩小了语言之间的差距,但目前它仍处于早期阶段。然而,基于RDD的API已经在砖的文档(访问日期2017-03-03)中被引用为“遗留流”,因此有理由期待进一步的统一努力。

    不履行注意事项

    奇偶特性

    并非所有Spark特性都是通过PySpark API公开的。一定要检查您需要的部分是否已经实现,并尝试了解可能的限制。

    当你使用MLlib和类似的混合上下文(参见从任务调用Java/Scala函数)时,这一点尤其重要。公平地说,PySpark API的某些部分,如mllib.linalg,提供了比Scala更全面的方法集。

    API设计

    PySpark API紧密地反映了它的Scala对等物,因此并不完全是python式的。这意味着在语言之间进行映射非常容易,但与此同时,Python代码可能非常难以理解。

    复杂的结构

    与纯JVM执行相比,PySpark数据流相对复杂。对PySpark程序或调试进行推理要困难得多。此外,至少要对Scala和JVM有基本的了解。

    火花2。X及以上

    不断向Dataset API转变,冻结RDD API为Python用户带来了机遇和挑战。虽然API的高级部分更容易在Python中公开,但更高级的功能几乎不可能被使用直接

    此外,本地Python函数在SQL世界中仍然是二等公民。希望将来Apache Arrow序列化(目前的努力目标数据collection,但UDF serde是长期目标)能改善这一点。

    对于强烈依赖Python代码库的项目,纯Python替代品(如Dask)可能是一个有趣的替代品。

    不一定非得是一个对另一个

    Spark DataFrame (SQL, Dataset) API提供了一种优雅的方式来集成Scala/Java代码在PySpark应用程序。您可以使用DataFrames将数据暴露给本机JVM代码并读取结果。我已经解释了一些选项别的地方,你可以在如何在Pyspark中使用Scala类中找到Python-Scala往返的工作示例。

    它可以通过引入用户定义类型(参见如何定义模式自定义类型在Spark SQL?)来进一步扩充。


    问题中提供的代码有什么问题

    (免责声明:python的观点。很可能我错过了一些Scala技巧)

    首先,你的代码中有一部分完全没有意义。如果你已经使用zipWithIndexenumerate创建了(key, value)对,那么创建字符串只是为了随后拆分它有什么意义呢?flatMap不是递归工作的,所以你可以简单地生成元组并跳过后面的map

    另一部分我发现有问题的是reduceByKey。一般来说,如果应用聚合函数可以减少必须打乱的数据量,reduceByKey是有用的。因为您只是简单地连接字符串,所以这里没有什么可获得的。忽略低级别的东西,比如引用的数量,你必须传输的数据量与groupByKey完全相同。

    通常情况下,我不会详述这一点,但据我所知,这是Scala代码中的一个瓶颈。在JVM上联接字符串是一个相当昂贵的操作(例如:在scala中的字符串连接是昂贵的,因为它是在Java?)。这意味着像_.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)这样的东西在你的代码中等价于input4.reduceByKey(valsConcat)不是一个好主意。

    如果你想避免groupByKey,你可以尝试使用aggregateByKeyStringBuilder。类似的东西应该可以达到这个目的:

    rdd.aggregateByKey(new StringBuilder)(
    (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
    },
    (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
    }
    )
    

    但我怀疑这值得大惊小怪。

    记住上面的内容,我重写了你的代码,如下所示:

    Scala:

    val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
    (idx, iter) => if (idx == 0) iter.drop(1) else iter
    }
    
    
    val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
    case ("true", i) => (i, "1")
    case ("false", i) => (i, "0")
    case p => p.swap
    })
    
    
    val result = pairs.groupByKey.map{
    case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
    }
    }
    
    
    result.saveAsTextFile("scalaout")
    

    Python:

    def drop_first_line(index, itr):
    if index == 0:
    return iter(list(itr)[1:])
    else:
    return itr
    
    
    def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
    yield (i, x)
    
    
    input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))
    
    
    pairs = input.flatMap(separate_cols)
    
    
    result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))
    
    
    result.saveAsTextFile("pythonout")
    

    结果

    local[6]模式下(Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz),每个执行器占用4GB内存(n = 3):

    • Scala - mean: 250.00s, stdev: 12.49
    • Python - mean: 246.66s, stdev: 1.15

    我敢肯定,大部分时间都花在了洗牌、序列化、反序列化和其他次要任务上。只是为了好玩,这里是Python中简单的单线程代码,它在这台机器上执行相同的任务不到一分钟:

    def go():
    with open("train.csv") as fr:
    lines = [
    line.replace('true', '1').replace('false', '0').split(",")
    for line in fr]
    return zip(*lines[1:])
    

延伸至以上答案-

Scala在许多方面都比python更快,但python比Scala更受欢迎是有一些合理的原因的,让我们看看其中的几个

Python for Apache Spark非常容易学习和使用。然而,这并不是Pyspark比Scala更好的唯一原因。有更多的。

用于Spark的Python API在集群上可能更慢,但最终,与Scala相比,数据科学家可以用它做更多的事情。Scala的复杂性是不存在的。界面简单全面。

谈到代码的可读性,维护和熟悉Apache的Python API, Spark远比Scala好。

Python附带了几个与机器学习和自然语言处理相关的库。这有助于数据分析,也有非常成熟和经过时间检验的统计数据。例如numpy, pandas, scikit-learn, seaborn和matplotlib。

注意:大多数数据科学家使用混合方法,即使用这两种api的优点。

最后,Scala社区对程序员的帮助很少。这使得Python成为一门非常有价值的学习。如果您对任何静态类型编程语言(如Java)都有足够的经验,那么您就不必担心完全不用Scala了。