如何检查火花数据帧是否为空?

现在,我必须使用 df.count > 0来检查 DataFrame是否为空。但效率有点低。还有更好的办法吗?

PS: 我想检查它是否是空的,这样我只保存 DataFrame,如果它不是空的

172921 次浏览

您可以利用 head()(或 first())函数来查看 DataFrame是否只有一行。如果是这样,它就不是空的。

我会说只需要抓取底层的 RDD。在 Scala 中:

df.rdd.isEmpty

在 Python 中:

df.rdd.isEmpty()

也就是说,所有这一切都是调用 take(1).length,所以它会做同样的事情,Rohan 回答... 只是可能稍微更明确?

如果你做 df.count > 0。它获取所有执行程序的所有分区的计数,并在 Driver 上将它们相加。当处理数百万行时,这需要一段时间。

最好的方法是执行 df.take(1)并检查它是否为 null。这将返回 java.util.NoSuchElementException所以更好地把一个尝试左右 df.take(1)

当完成 take(1)而不是空行时,数据帧返回一个错误。我已经高亮显示了它抛出错误的特定代码行。

enter image description here

对于 Spark 2.1.0,我的建议是将 head(n: Int)take(n: Int)isEmpty一起使用,无论哪一个对您有最明确的意图。

df.head(1).isEmpty
df.take(1).isEmpty

相当于 Python:

len(df.head(1)) == 0  # or bool(df.head(1))
len(df.take(1)) == 0  # or bool(df.take(1))

如果 DataFrame 为空,则使用 df.first()df.head()都将返回 java.util.NoSuchElementExceptionfirst()直接调用 head()head()调用 head(1).head

def first(): T = head()
def head(): T = head(1).head

head(1)返回一个 Array,因此在 Array 上使用 head会在 DataFrame 为空时导致 java.util.NoSuchElementException

def head(n: Int): Array[T] = withAction("head", limit(n).queryExecution)(collectFromPlan)

因此,不要调用 head(),而是直接使用 head(1)获取数组,然后可以使用 isEmpty

take(n)也等于 head(n)..。

def take(n: Int): Array[T] = head(n)

而且 limit(1).collect()等价于 head(1)(请注意 head(n: Int)方法中的 limit(n).queryExecution) ,所以以下内容都是等价的,至少从我所知道的来看是这样,而且当 DataFrame 为空时,您不必捕获 java.util.NoSuchElementException异常。

df.head(1).isEmpty
df.take(1).isEmpty
df.limit(1).collect().isEmpty

我知道这是一个老问题,所以希望它将有助于人们使用新版本的 Spark。

df1.take(1).length>0

take方法返回行数组,因此如果数组大小等于零,则 df中没有记录。

你可以这样做:

val df = sqlContext.emptyDataFrame
if( df.eq(sqlContext.emptyDataFrame) )
println("empty df ")
else
println("normal df")

在 Scala 中,可以对 使用隐式方法 isEmpty(),对 DataFrame API 使用隐式方法 nonEmpty(),这将使代码更易于阅读。

object DataFrameExtensions {
implicit def extendedDataFrame(dataFrame: DataFrame): ExtendedDataFrame =
new ExtendedDataFrame(dataFrame: DataFrame)


class ExtendedDataFrame(dataFrame: DataFrame) {
def isEmpty(): Boolean = dataFrame.head(1).isEmpty // Any implementation can be used
def nonEmpty(): Boolean = !isEmpty
}
}

在这里,还可以添加其他方法。若要使用隐式转换,请在希望使用扩展功能的文件中使用 import DataFrameExtensions._。然后,可以直接使用这些方法:

val df: DataFrame = ...
if (df.isEmpty) {
// Do something
}

我在一些案例中发现:

>>>print(type(df))
<class 'pyspark.sql.dataframe.DataFrame'>


>>>df.take(1).isEmpty
'list' object has no attribute 'isEmpty'

这对于“ length”或者 take ()替换为 head ()也是一样的

[解决方案]解决我们可以利用的问题。

>>>df.limit(2).count() > 1
False

对于 Java 用户,您可以在数据集上使用它:

public boolean isDatasetEmpty(Dataset<Row> ds) {
boolean isEmpty;
try {
isEmpty = ((Row[]) ds.head(1)).length == 0;
} catch (Exception e) {
return true;
}
return isEmpty;
}

这将检查所有可能的场景(空、空)。

如果你正在使用火花,你也可以这样做:

len(df.head(1)) > 0

从 Spark 2.4.0开始,就有了 Dataset.isEmpty

它的 实施是:

def isEmpty: Boolean =
withAction("isEmpty", limit(1).groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0) == 0
}

请注意,在 Scala 中,DataFrame不再是一个类,它只是一个 类型化名(可能随着 Spark 2.0而改变) :

type DataFrame = Dataset[Row]

在 PySpark 上,还可以使用此 bool(df.head(1))获得 False值的 True

如果数据帧不包含行,则返回 False

我有同样的问题,我测试了3个主要的解决方案:

  1. (df != null) && (df.count > 0)
  2. df.head(1).isEmpty()的@hulin003建议
  3. df.rdd.isEmpty()就像@Justin Pihony 建议的那样

当然,这3种方法都是有效的,但是就性能而言,我发现,当我在我的机器上在同一个 DF 上执行这些方法时,就执行时间而言:

  1. 它需要 ~ 9366ms
  2. 它需要 ~ 5607毫秒
  3. 它需要 ~ 1921毫秒

因此我认为最好的解决方案是 df.rdd.isEmpty()就像@Justin Pihony 建议的那样

dataframe.limit(1).count > 0

这也会触发一个任务,但是由于我们选择单个记录,即使是十亿比例的记录,耗费的时间也可以少得多。

来自: Https://medium.com/checking-emptiness-in-distributed-objects/count-vs-isempty-surprised-to-see-the-impact-fa70c0246ee0

假设我们有以下空数据框:

df = spark.sql("show tables").limit(0)

如果您正在使用 Spark 2.1 for pypark 来检查此数据框是否为空,可以使用:

df.count() > 0

或者

bool(df.head(1))

PySpark 3.3.0 +/Scala 2.4.0 +

df.isEmpty()