火花函数与 UDF 性能的比较?

Spark 现在提供了可以在数据框架中使用的预定义函数,而且它们似乎经过了高度优化。我最初的问题是哪个更快,但我自己做了一些测试,发现火花函数至少在一个实例中快了10倍左右。有没有人知道为什么会这样,什么时候 UDF 会更快(只有在相同的火花函数存在的情况下) ?

下面是我的测试代码(运行在 Databricks 社区 ed 上) :

# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)


# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
name = fake.name().split()
return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)


# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
for _ in xrange(times):
yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print len(data)
data[0]


dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age'))
dataDF.cache()

UDF 函数:

concat_s = udf(lambda s: s+ 's')
udfData = dataDF.select(concat_s(dataDF.first_name).alias('name'))
udfData.count()

火花功能:

spfData = dataDF.select(concat(dataDF.first_name, lit('s')).alias('name'))
spfData.count()

运行两次,UDF 通常需要1.1-1.4秒,而 Spark concat功能总是在0.15秒以下。

53656 次浏览

UDF 什么时候会更快

如果您询问有关 Python UDF 的问题,答案可能是从来没有 * 。由于 SQL 函数相对简单,而且不是为复杂任务设计的,因此几乎不可能弥补 Python 解释器和 JVM 之间重复序列化、反序列化和数据移动的成本。

有人知道为什么会这样吗

上面已经列举了主要原因,可以归结为一个简单的事实,即 Spark DataFrame本身是一个 JVM 结构,标准访问方法是通过对 Java API 的简单调用实现的。另一方面,UDF 是在 Python 中实现的,需要来回移动数据。

虽然 PySpark 通常需要在 JVM 和 Python 之间进行数据移动,但是对于低级 RDD API,它通常不需要昂贵的 serde 活动。Spark SQL 增加了序列化和序列化的额外成本,以及在 JVM 上将数据从不安全表示转移到不安全表示的成本。后者是特定于所有 UDF (Python、 Scala 和 Java)的,而前者是特定于非本地语言的。

与 UDF 不同,Spark SQL 函数直接在 JVM 上操作,并且通常与 Catalyst 和钨集成得很好。这意味着这些可以在执行计划中进行优化,并且大部分时间可以从 codgen 和其他钨优化中受益。此外,它们可以以“本机”表示形式对数据进行操作。

所以从某种意义上说,这里的问题在于 Python UDF 必须将数据带到代码中,而 SQL 表达式则相反。


* 根据 粗略估计 PySpark 窗口 UDF 可以击败 Scala 窗口函数。

多年以后,当我对这个问题有了更多的认识,并重新审视这个问题时,我才意识到@alfredox 真正想问的是什么。于是我又修改了一遍,把答案分成两部分:


要回答为什么本地 DF 函数(本地 Spark-SQL 函数)更快:

基本上,为什么本地 Spark 函数总是比 Spark UDF 快,无论您的 UDF 是用 Python 还是 Scala 实现的。

首先,我们需要了解什么是 ,也就是 在 Spark 1.4中首次引入

这是一个后端,也是它关注的焦点:

  1. 使用二进制内存数据表示即钨行格式的离堆内存管理和显式管理内存,
  2. Cache Locality 是关于高速缓存命中率的高速缓存感知计算和高速缓存感知布局,
  3. 全阶段代码生成(又名 CodeGen)。

最大的性能杀手之一是 GC。GC 将暂停 JVM 中的每个线程,直到 GC 完成。这正是引入离堆内存管理的原因。

当执行 Spark-SQL 本机函数时,数据将保留在 tunsten 后端。然而,在 Spark UDF 场景中,数据将从 tunsten 移出到 JVM (Scala 场景)或 JVM 和 Python Process (Python)中,以执行实际的进程,然后再移回到 tunsten。结果就是:

  1. 不可避免地,会有一笔开销/罚金:
    1. 反序列化钨的输入。
    2. 把输出序列化为钨。
  2. 即使使用第一类物件 Spark 中的 Scala,它也会增加 JVM 内的内存占用,这可能会在 JVM 中涉及更多的 GC这个问题正是钨“离堆内存管理”功能试图解决的问题。

要回答 Python 是否一定比 Scala 慢:

自2017年10月30日以来,Spark 刚刚推出了向量化的 pypark 无人机。

Https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

Python UDF 之所以慢,可能是因为 PySpark UDF 没有以最优化的方式实现:

根据段落中的链接。

Spark 在0.7版本中添加了 Python API,支持用户定义函数。这些用户定义的函数操作 一次一行,因此承受着很高的序列化和调用开销。

然而,新矢量化的 udfs 似乎在很大程度上提高了性能:

范围从3倍到超过100倍。

enter image description here

在恢复使用自己的定制 UDF 函数之前,尽可能使用基于更高级标准 Column 的函数和 Dataset 操作符,因为 UDF 是 Spark 的 黑盒,所以它使用 想都别想来优化它们。

实际上,在屏幕背后发生的是 Catalyst 根本不能处理和优化 UDF,而且它威胁到了它们作为 BlackBox 的地位,这导致了许多优化的失败,比如谓词下推、常数折叠和其他许多优化。