Spark中DataFrame、Dataset和RDD的区别

我只是想知道Apache Spark中的RDDDataFrame (Spark 2.0.0 DataFrame只是Dataset[Row]的类型别名)之间的区别是什么?

你能把一个转换成另一个吗?

163072 次浏览

通过谷歌搜索“DataFrame definition”可以很好地定义DataFrame:

数据帧是一个表格,或二维数组状结构,在 每一列包含对一个变量的测量,以及每一行

因此,DataFrame由于其表格格式而具有额外的元数据,这允许Spark在最终查询上运行某些优化。

另一方面,RDD仅仅是一个Resilient的__abc2分布式__abc2数据集,它更像是一个数据黑箱,不能对其进行优化,因为可以对其执行的操作不受约束。

然而,你可以通过它的rdd方法从一个DataFrame到RDD,你也可以通过toDF方法从一个RDD到一个DataFrame(如果RDD是一个表格格式)

由于内置的查询优化,建议在可能的情况下使用DataFrame

一个DataFrame相当于RDBMS中的一个表,也可以以类似于rdd中的“原生”分布式集合的方式进行操作。与rdd不同,dataframe跟踪模式并支持各种关系操作,从而实现更优化的执行。 每个DataFrame对象代表一个逻辑计划,但由于它们的“惰性”性质,直到用户调用特定的“输出操作”才会执行

简单地说,RDD是核心组件,但DataFrame是spark 1.30中引入的API。

抽样

数据分区的集合,称为RDD。这些RDD必须遵循以下几个属性:

  • 不可变的,
  • 容错,
  • 分布式的,
  • 更多。

这里RDD要么是结构化的,要么是非结构化的。

DataFrame

DataFrame是一个在Scala, Java, Python和r中可用的API。它允许处理任何类型的结构化和半结构化数据。定义名为DataFrame的命名列中的分布式数据集合。你可以很容易地在DataFrame中优化RDDs。 您可以使用DataFrame一次性处理JSON数据、parquet数据、HiveQL数据
val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json")


val sample_DF = sampleRDD.toDF()

这里Sample_DF考虑为DataFramesampleRDD是(原始数据)称为RDD

因为DataFrame是弱类型的,开发人员没有得到类型系统的好处。例如,假设你想从SQL中读取一些东西,并对其运行一些聚合:

val people = sqlContext.read.parquet("...")
val department = sqlContext.read.parquet("...")


people.filter("age > 30")
.join(department, people("deptId") === department("id"))
.groupBy(department("name"), "gender")
.agg(avg(people("salary")), max(people("age")))

当你说people("deptId")时,你不会返回一个IntLong,你会返回一个你需要操作的Column对象。在具有丰富类型系统的语言(如Scala)中,您最终失去了所有类型安全,这增加了在编译时可以发现的运行时错误的数量。

相反,DataSet[T]是类型化的。当你这样做时:

val people: People = val people = sqlContext.read.parquet("...").as[People]

你实际上得到了一个People对象,其中deptId是一个实际的整型而不是列型,因此利用了类型系统。

从Spark 2.0开始,DataFrame和DataSet api将是统一的,其中DataFrame将是DataSet[Row]的类型别名。

首先,DataFrame是从SchemaRDD演变而来的。

 deprecated method toSchemaRDD

是的. .DataframeRDD之间的转换是绝对可能的。

下面是一些示例代码片段。

  • df.rddRDD[Row]

下面是一些创建数据框架的选项。

  • 1) yourrddOffrow.toDF转换为DataFrame

  • 2)使用sql context的createDataFrame

    val df = spark.createDataFrame(rddOfRow, schema) < / p >

正如漂亮的SO帖子所描述的…
.

,其中schema可以来自以下选项中的一些 从scala case类和scala反射api

import org.apache.spark.sql.catalyst.ScalaReflection
val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType]

或使用Encoders

import org.apache.spark.sql.Encoders
val mySchema = Encoders.product[MyCaseClass].schema

也可以使用StructTypeStructField < / p >

val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("col1", DoubleType, true))
.add(StructField("col2", DoubleType, true)) etc...

image description

实际上现在有3个Apache Spark api .

enter image description here

  1. __abc0 API:

RDD(弹性分布式数据集)API已经在Spark中

RDD API提供了许多转换方法,例如map(), filter()和reduce()用于对数据执行计算。每一个 这些方法的结果是一个新的RDD表示转换后的 数据。然而,这些方法只是定义操作 执行,并且直到一个操作才执行转换 方法。动作方法的例子有collect()和 saveAsObjectFile () . < / p >

< em >抽样的例子:< / em >

rdd.filter(_.age > 21) // transformation
.map(_.last)// transformation
.saveAsObjectFile("under21.bin") // action

示例:RDD按属性过滤

rdd.filter(_.age > 21)
  1. DataFrame API

Spark 1.3引入了一个新的DataFrame API作为项目的一部分 钨倡议,旨在提高性能和 Spark的可扩展性。DataFrame API引入了一个概念 模式来描述数据,允许Spark管理模式和 只在节点之间传递数据,以一种比使用更有效的方式 Java序列化。< / p > DataFrame API与RDD API完全不同,因为它 是一个用于构建关系查询计划的API, Spark的Catalyst 优化器就可以执行了。这个API对于那些需要帮助的开发人员来说是很自然的 熟悉构建查询计划

示例SQL style:

df.filter("age > 21");

< >强限制: 因为代码是按名称引用数据属性的,所以编译器不可能捕捉到任何错误。如果属性名不正确,则该错误只会在运行时,即创建查询计划时检测到

DataFrame API的另一个缺点是它非常以scala为中心,虽然它支持Java,但支持是有限的。

例如,当从现有的Java对象的RDD创建DataFrame时,Spark的Catalyst优化器无法推断模式,并假设DataFrame中的任何对象都实现了scala.Product接口。Scala case class解决了这个问题,因为它们实现了这个接口。

  1. Dataset API

Dataset API,在Spark 1.6中作为API预览发布,旨在 两全其美;熟悉的面向对象 RDD API的编程风格和编译时类型安全 Catalyst查询优化器的性能优势。数据集 还要使用与之相同的高效堆外存储机制 DataFrame API。< / p > 当涉及到序列化数据时,Dataset API有一个概念 编码器在JVM表示(对象)和 Spark的内部二进制格式。Spark有内置编码器 非常先进的是,它们可以生成字节代码进行交互 堆外数据,并提供对个别属性的按需访问 而不需要反序列化整个对象。Spark还没有 提供一个用于实现自定义编码器的API,但这是计划中的

此外,Dataset API被设计为与 Java和Scala。在处理Java对象时,这很重要

Dataset API SQL style: . 示例

dataset.filter(_.age < 21);

< >强__ABC0 &DataSet: enter image description here < / p >

< >强Catalist level flow.。(解密spark峰会上的数据框架和数据集演示) enter image description here < / p >

进一步阅读…砖文章-三个Apache Spark api的故事:rdd vs dataframe和数据集

Apache Spark提供了三种类型的api

  1. 抽样
  2. DataFrame
  3. 数据集

 comparative RDD, Dataframe and Dataset APIs .

这里是RDD, Dataframe和Dataset之间的api比较。

抽样

Spark提供的主要抽象是一个弹性分布式数据集(RDD),它是跨集群节点划分的元素集合,可以并行操作。

抽样特性:

    <李> < p > 分布式收集:
    RDD使用MapReduce操作,该操作被广泛用于处理和生成大型数据集,并在集群上使用并行分布式算法。它允许用户使用一组高级运算符来编写并行计算,而不必担心工作分配和容错
  • 不变的:由分区记录集合组成的rdd。分区是RDD中并行的基本单元,每个分区都是数据的一个逻辑分区,是不可变的,是通过对现有分区进行一些转换而创建的。不可变性有助于实现计算的一致性。

  • < p > < >强容错: 如果我们丢失了RDD的某个分区,我们可以在沿袭中重播该分区上的转换以实现相同的计算,而不是跨多个节点进行数据复制。这个特性是RDD最大的好处,因为它在数据管理和复制方面节省了大量的精力,从而实现了更快的计算
  • Spark中的所有转换都是懒惰的,因为它们不会立即计算结果。相反,它们只记住应用到一些基本数据集的转换。只有当操作需要将结果返回给驱动程序时,才计算转换。

  • < p > < >强劲功能转换: rdd支持两种类型的操作:转换(从现有数据集创建新数据集)和动作(在数据集上运行计算后将值返回给驱动程序)
  • <李> < p > 数据处理格式:
    它可以简单有效地处理结构化数据和非结构化数据 <李> < p > 支持的编程语言:
    RDD API可用于Java, Scala, Python和R.

抽样的局限性:

  • 没有内置优化引擎: 在处理结构化数据时,rdd无法利用Spark的高级优化器,包括catalyst优化器和Tungsten执行引擎。开发者需要根据每个RDD的属性对其进行优化
  • 处理结构化数据: 与Dataframe和数据集不同,rdd不推断所摄取数据的模式,并要求用户指定它

Dataframes

Spark在Spark 1.3版本中引入了Dataframes。Dataframe克服了rdd所面临的主要挑战。

DataFrame是一个分布式的数据集合,它被组织成命名的列。它在概念上等同于关系数据库或R/Python Dataframe中的表。除了Dataframe, Spark还引入了catalyst优化器,它利用高级编程特性来构建可扩展的查询优化器。

Dataframe特点:-

  • 行对象的分布式集合: DataFrame是一个分布式的数据集合,它被组织成命名的列。它在概念上等同于关系数据库中的表,但在底层进行了更丰富的优化

  • < p > < >强数据处理: 处理结构化和非结构化数据格式(Avro、CSV、弹性搜索、Cassandra)和存储系统(HDFS、HIVE表、MySQL等)。它可以读取和写入所有这些不同的数据源
  • 使用催化剂优化器优化: 它同时支持SQL查询和DataFrame API。数据框架采用催化树转换框架,分四阶段,

     1.Analyzing a logical plan to resolve references
    2.Logical plan optimization
    3.Physical planning
    4.Code generation to compile parts of the query to Java bytecode.
    
  • < p > < >强蜂巢兼容性: 使用Spark SQL,您可以在现有的Hive仓库上运行未经修改的Hive查询。它重用了Hive前端和MetaStore,并为您提供了与现有Hive数据,查询和udf的完全兼容性
  • < p > < >强钨: Tungsten提供了一个物理执行后端,它显式地管理内存并动态地为表达式求值生成字节码
  • <李> < p > 支持的编程语言:
    Dataframe API可用于Java、Scala、Python和R.

Dataframe限制:

  • 编译时类型安全: 如前所述,Dataframe API不支持编译时安全,这限制了你在不知道结构时操作数据。下面的示例在编译时工作。但是,当你执行这段代码时,你会得到一个运行时异常

例子:

case class Person(name : String , age : Int)
val dataframe = sqlContext.read.json("people.json")
dataframe.filter("salary > 10000").show
=> throws Exception : cannot resolve 'salary' given input age , name

这很有挑战性,特别是当您正在处理多个转换和聚合步骤时。

  • 不能操作域对象(丢失域对象): 一旦将域对象转换为数据框架,就不能从中重新生成数据框架。在下面的例子中,一旦我们从personRDD创建了personDF,我们将不会恢复Person类的原始RDD (RDD[Person])

例子:

case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]

数据集的API

Dataset API是DataFrames的扩展,它提供了一个类型安全的面向对象的编程接口。它是映射到关系模式的强类型、不可变对象集合。

在数据集的核心,API是一个叫做编码器的新概念,它负责在JVM对象和表格表示之间进行转换。表格表示使用Spark内部的钨二进制格式存储,允许对序列化数据进行操作,并提高内存利用率。Spark 1.6支持为各种类型自动生成编码器,包括基本类型(例如String、Integer、Long)、Scala case类和Java bean。

数据集的特性:

  • 提供最好的RDD和Dataframe: RDD(函数式编程,类型安全),DataFrame(关系模型,查询优化,钨执行,排序和变换)

  • < p > < >强编码器: 使用编码器,可以很容易地将任何JVM对象转换为数据集,允许用户使用结构化和非结构化数据,而不像Dataframe
  • 支持的编程语言: Datasets API目前仅在Scala和Java中可用。Python和R目前在1.6版中不受支持。Python支持将在2.0版本
  • < p > < >强类型安全: 数据集API提供了编译时安全,这在数据框架中是不存在的。在下面的例子中,我们可以看到Dataset如何使用compile lambda函数对域对象进行操作

例子:

case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
// error : value salary is not a member of person
ds.rdd // returns RDD[Person]
  • 可互操作的:数据集允许您轻松地将现有的rdd和dataframe转换为数据集,而无需样板代码。

数据集API限制:-

  • 要求类型转换为字符串: 目前从数据集中查询数据需要我们将类中的字段指定为字符串。查询完数据后,必须将列强制转换为所需的数据类型。另一方面,如果我们在数据集上使用map操作,它将不会使用Catalyst优化器

例子:

ds.select(col("name").as[String], $"age".as[Int]).collect()

不支持Python和R:从1.6版开始,数据集只支持Scala和Java。Python支持将在Spark 2.0中引入。

Datasets API与现有的RDD和Dataframe API相比,具有更好的类型安全性和函数式编程优势。面对API中类型强制转换需求的挑战,您仍然无法获得所需的类型安全性,并将使您的代码变得脆弱。

Dataframe是Row对象的RDD,每个对象代表一条记录。一个 Dataframe还知道它的行的模式(即数据字段)。虽然Dataframes 看起来像常规的rdd,它们内部以更有效的方式存储数据,利用它们的模式。此外,它们还提供了rdd上不可用的新操作,例如运行SQL查询的能力。数据帧可以从外部数据源、查询结果或常规rdd中创建

参考文献:Zaharia M., et al。学习火花(O'Reilly, 2015)

大部分答案都是正确的,我只想补充一点

在Spark 2.0中,这两个API (DataFrame +DataSet)将统一为一个API。

统一DataFrame和Dataset:在Scala和Java中,DataFrame和Dataset是统一的,即DataFrame只是Dataset of Row的类型别名。在Python和R中,由于缺乏类型安全,DataFrame是主要的编程接口。”

数据集类似于rdd,但是,它们不使用Java序列化或Kryo,而是使用专门的Encoder来序列化对象,以便在网络上进行处理或传输。

Spark SQL支持两种将现有rdd转换为数据集的方法。第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以生成更简洁的代码,如果在编写Spark应用程序时已经知道模式,这种方法也能很好地工作。

创建数据集的第二种方法是通过编程接口,该接口允许您构造一个模式,然后将其应用于现有的RDD。虽然此方法更详细,但它允许您在运行时之前不知道列及其类型时构造数据集。

在这里你可以找到RDD tof数据帧对话的答案

如何在spark中将rdd对象转换为dataframe

所有(RDD、DataFrame和DataSet)在一张图片中。

RDD vs DataFrame vs DataSet

image credit

RDD

RDD是一个可以并行操作的元素的容错集合。

DataFrame

DataFrame是一个被组织成命名列的数据集。它是 概念上等价于关系数据库中的表或数据 在R/Python中,但在引擎盖下有更丰富的优化.

Dataset

Dataset是数据的分布式集合。Dataset是Spark 1.6中新增的接口,提供rdd的优势 (强类型,能够使用强大的lambda函数) Spark SQL的优化执行引擎的好处。


注意:

Scala/Java中的行数据集 (Dataset[Row])通常指作为DataFrames


用一个代码片段对它们进行了很好的比较。

RDD vs DataFrame vs DataSet with code

source


问:你能把一个转换成另一个,像RDD到DataFrame,反之亦然?

是的,两者都有可能

1. RDDDataFrame,使用.toDF()

val rowsRdd: RDD[Row] = sc.parallelize(
Seq(
Row("first", 2.0, 7.0),
Row("second", 3.5, 2.5),
Row("third", 7.0, 5.9)
)
)


val df = spark.createDataFrame(rowsRdd).toDF("id", "val1", "val2")


df.show()
+------+----+----+
|    id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+

更多方法:在Spark中将RDD对象转换为Dataframe

2. 使用.rdd()方法DataFrame/DataSetRDD

val rowsRdd: RDD[Row] = df.rdd() // DataFrame to RDD

从使用的角度来看,RDD vs DataFrame:

  1. rdd是惊人的!因为它们让我们可以灵活地处理几乎任何类型的数据;非结构化、半结构化和结构化数据。因为,很多时候数据还没有准备好适合一个DataFrame,(甚至是JSON), rdd可以用来对数据进行预处理,这样它就可以适合一个DataFrame。rdd是Spark的核心数据抽象。
  2. 并非所有可能在RDD上的转换都可能在DataFrame上,例如subtract()是用于RDD,而except()是用于DataFrame。
  3. 由于dataframe类似于关系表,因此在使用集/关系理论转换时,它们遵循严格的规则,例如,如果您想合并两个dataframe,则要求两个dfs具有相同数量的列和相关的列数据类型。列名可以不同。这些规则不适用于rdd。这是一个很好的教程解释这些事实。
  4. 使用dataframe时,性能会有所提高,其他人已经深入解释过了。
  5. 使用dataframe时,您不需要像使用rdd编程时那样传递任意函数。
  6. 你需要SQLContext/HiveContext来编程数据帧,因为它们位于spark生态系统的SparkSQL区域,但对于RDD,你只需要spark Core库中的SparkContext/JavaSparkContext。
  7. 如果可以为RDD定义模式,则可以从RDD创建df。
  8. 你也可以将df转换为rdd, rdd转换为df。

我希望这能有所帮助!

DataFrame是一个有模式的RDD。您可以把它看作一个关系数据库表,其中每一列都有一个名称和一个已知的类型。DataFrames的强大之处在于这样一个事实:当你从结构化数据集(Json, Parquet..)创建一个DataFrame时,Spark能够通过传递正在加载的整个数据集(Json, Parquet..)来推断一个模式。然后,在计算执行计划时,Spark可以使用该模式并进行更好的计算优化。 注意DataFrame在Spark v1.3.0之前被称为SchemaRDD

Spark RDD (resilient distributed dataset):

RDD是核心数据抽象API,从Spark的第一个版本(Spark 1.0)开始就可用了。它是用于操作分布式数据集合的低级API。RDD api公开了一些非常有用的方法,可用于对底层物理数据结构进行非常严格的控制。它是分布在不同机器上的分区数据的不可变(只读)集合。RDD可以在大集群上进行内存计算,以容错的方式加快大数据的处理速度。 为了实现容错,RDD使用DAG(有向无环图),它由一组顶点和边组成。DAG中的顶点和边分别表示RDD和应用于该RDD的操作。RDD上定义的转换是惰性的,仅当一个操作被调用

时才执行

Spark DataFrame:

Spark 1.3引入了两个新的数据抽象api——DataFrame和DataSet。DataFrame api将数据组织成命名的列,就像关系数据库中的表一样。它使程序员能够在分布式数据集合上定义模式。DataFrame中的每一行都是对象类型行。与SQL表一样,DataFrame中的每一列必须具有相同的行数。简而言之,DataFrame是一种惰性评估计划,它指定了需要在分布式数据集合上执行的操作。DataFrame也是一个不可变的集合。

Spark DataSet:

作为DataFrame api的扩展,Spark 1.3还引入了DataSet api,在Spark中提供严格类型和面向对象的编程接口。它是不可变的、类型安全的分布式数据集合。像DataFrame一样,DataSet APIs也使用Catalyst引擎来实现执行优化。DataSet是DataFrame api的扩展。

Other Differences -

enter image description here

Apache Spark - RDD, DataFrame和DataSet

火花抽样 -

RDD代表弹性分布式数据集。只读 记录的分区集合。RDD是最基本的数据结构 的火花。它允许程序员在内存中执行计算 采用容错方式的大型集群。

火花Dataframe -

与RDD不同,数据被组织成命名列。比如一张表 在关系数据库中。的不可变分布式集合 数据。Spark中的DataFrame允许开发人员在上面强加一个结构 一个分布式的数据集合,允许更高层次的抽象

火花数据集 -

Apache Spark中的数据集是DataFrame API的扩展 提供类型安全的面向对象编程接口。数据集 通过暴露表达式来利用Spark的Catalyst优化器 和数据字段到查询计划器

a. RDD (Spark1.0) ->Dataframe (Spark1.3)→数据集(Spark1.6)

b. RDD让我们决定如何做,这限制了Spark在底层处理上的优化。dataframe/dataset让我们决定我们想做什么,并把一切都留给Spark来决定如何进行计算。

作为内存中的jvm对象,RDD涉及到垃圾收集和Java(或稍微好一点的Kryo)序列化的开销,当数据增长时,这些开销是昂贵的。这会降低性能。

数据帧比rdd提供了巨大的性能提升,因为它有2个强大的特性:

  1. 自定义内存管理(又名Project Tungsten)
  2. 优化的执行计划(又名催化剂优化器)
    性能明智的RDD ->数据帧->李集< / >

d.数据集(Project Tungsten和Catalyst Optimizer)如何在数据帧上得分是它拥有的另一个功能:编码器