当从文本文件或集合(或从另一个 RDD)创建弹性分布式数据集(RDD)时,我们是否需要显式调用“ cache”或“ keep”来将 RDD 数据存储到内存中?或者 RDD 数据默认以分布式方式存储在内存中?
val textFile = sc.textFile("/user/emp.txt")
根据我的理解,在上述步骤之后,textFile 是一个 RDD,并且可以在节点的所有/部分内存中使用。
如果是这样,为什么我们需要调用“缓存”或“持久化”的文本文件 RDD 呢?
我们是否需要显式地调用“ cache”或“ keep”来将 RDD 数据存储到内存中?
是的,如果需要的话。
默认情况下,RDD 数据以分布式方式存储在内存中?
不!
原因如下:
Spark 支持两种类型的共享变量: 广播变量,可用于在所有节点上缓存内存中的值; 累加器,是只“添加”到其中的变量,例如计数器和和。
RDD 支持两种类型的操作: 转换(从现有数据集创建新数据集)和操作(在数据集上运行计算后将值返回给驱动程序)。例如,map 是一个转换,它通过函数传递每个数据集元素,并返回一个表示结果的新 RDD。另一方面,reduce 是一个动作,它使用某个函数聚合 RDD 的所有元素,并将最终结果返回给驱动程序(尽管还有一个并行 reduceByKey,它返回一个分布式数据集)。
Spark 中的所有转换都是惰性的,因为它们不会立即计算结果。相反,它们只是记住应用于某些基本数据集(例如文件)的转换。只有当操作需要将结果返回到驱动程序时,才计算转换。这种设计使 Spark 能够更有效地运行——例如,我们可以认识到通过 map 创建的数据集将用于 reduce,并且只将 reduce 的结果返回给驱动程序,而不是更大的映射数据集。
默认情况下,每次在转换后的 RDD 上运行操作时,都可以重新计算每个转换后的 RDD。然而,您也可以使用持久化(或缓存)方法在内存中持久化 RDD,在这种情况下,Spark 将在集群中保留元素,以便下次查询时更快地访问它。还支持将 RDD 持久化到磁盘上,或者跨多个节点复制。
详情请参阅 火花编程指南。
大多数 RDD 操作是懒惰的。可以将 RDD 看作一系列操作的描述。RDD 不是数据。所以这句话:
没用的。它创建了一个 RDD,表示“我们需要加载这个文件”。此时未加载该文件。
需要观察数据内容的 RDD 操作不能懒惰。(这些被称为 行动。)例如 RDD.countー要告诉您文件中的行数,需要读取文件。因此,如果您编写 textFile.count,在这一点上,文件将被读取,行将被计数,并且计数将被返回。
RDD.count
textFile.count
如果您再次调用 textFile.count怎么办?同样的事情: 文件将被读取并重新计数。没有储存任何东西。RDD 不是数据。
那么 RDD.cache是做什么的呢? 如果你在上面的代码中加入 textFile.cache:
RDD.cache
textFile.cache
val textFile = sc.textFile("/user/emp.txt") textFile.cache
没用的。RDD.cache也是一个惰性操作。文件仍未读取。但是现在 RDD 说“读取这个文件,然后缓存内容”。如果第一次运行 textFile.count,文件将被加载、缓存和计数。如果第二次调用 textFile.count,操作将使用缓存。它只是从缓存中获取数据并计算行数。
缓存行为取决于可用内存。例如,如果文件在内存中不适合,那么 textFile.count将回到通常的行为并重新读取文件。
我认为这个问题可以更好地表述为:
火花进程是懒惰的,也就是说,除非需要,否则不会发生任何事情。 为了快速回答这个问题,在发出 val textFile = sc.textFile("/user/emp.txt")之后,数据没有发生任何变化,只是构造了一个 HadoopRDD,使用文件作为源。
HadoopRDD
假设我们稍微转换一下数据:
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
同样,数据没有发生任何变化。现在有了一个新的 RDD wordsRDD,它包含对 testFile的引用和一个在需要时应用的函数。
wordsRDD
testFile
只有当对 RDD 调用某个操作(如 wordsRDD.count)时,才会执行称为 血统的 RDD 链。也就是说,分区的数据将由 Spark 集群的执行器加载,应用 flatMap函数并计算结果。
wordsRDD.count
flatMap
在线性谱系上,如本例所示,不需要 cache()。数据将被加载到执行器,所有的转换将被应用,最后 count将被计算,所有在内存中-如果数据适合在内存中。
cache()
count
当 RDD 的谱系分支出去时,cache是有用的。假设您希望将前面示例中的单词过滤为正负单词的计数。你可以这样做:
cache
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count() val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()
在这里,每个分支都会重新加载数据。添加显式的 cache语句将确保保留和重用以前完成的处理。这项工作将是这样的:
val textFile = sc.textFile("/user/emp.txt") val wordsRDD = textFile.flatMap(line => line.split("\\W")) wordsRDD.cache() val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count() val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()
由于这个原因,cache被称为“断开沿袭”,因为它创建了一个可以重用于进一步处理的检查点。
经验法则: 使用 cache时,你的 RDD 分支出来谱系或当一个 RDD 被多次使用,像在一个循环。
添加添加(或临时添加) cache方法调用的另一个原因。
使用 cache方法,火花将提供关于 RDD 大小的调试信息。因此,在火花集成的用户界面,您将获得 RDD 内存消耗信息。这对诊断记忆问题很有帮助。
下面是应该缓存 RDD 的三种情况:
多次使用 RDD 在同一个 RDD 上执行多个操作 用于长链(或非常昂贵的)转换
多次使用 RDD
在同一个 RDD 上执行多个操作
用于长链(或非常昂贵的)转换