如何覆盖火花中的输出目录

我有一个火花流应用程序,每分钟产生一个数据集。 我需要保存/覆盖处理数据的结果。

当我试图覆盖数据集 org.apache.hadoop.mapred. FileAlreadyISTsException 时,会停止执行。

我设置了 Spark 属性 set("spark.files.overwrite","true"),但是没有运气。

如何覆盖或预删除文件从火花?

286646 次浏览

参数 spark.files.overwrite的文档说: “当目标文件存在且其内容与源文件不匹配时,是否覆盖通过 SparkContext.addFile()添加的文件。”因此它对 saveAsTextFiles 方法没有影响。

您可以在保存文件之前这样做:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

Aas 在这里解释道: Http://apache-spark-user-list.1001560.n3.nabble.com/how-can-i-make-spark-1-0-saveastextfile-to-overwrite-existing-file-td6696.html

更新: 建议使用 Dataframes,加上类似 ... .write.mode(SaveMode.Overwrite) ...的东西。

皮条客:

implicit class PimpedStringRDD(rdd: RDD[String]) {
def write(p: String)(implicit ss: SparkSession): Unit = {
import ss.implicits._
rdd.toDF().as[String].write.mode(SaveMode.Overwrite).text(p)
}
}

对于旧版本,可以试试

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

在1.1.0中,您可以使用带有—— conf 标志的星火提交脚本来设置 conf 设置。

警告(旧版本) : 根据@Piggybox 的说法,Spark 有一个 bug,它只会覆盖写 part-文件所需的文件,其他文件都不会被删除。

Sql.DataFrame.save文档(目前为1.3.1)中,可以在保存 DataFrame 时指定 mode='overwrite':

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

我已经验证了这甚至可以删除剩余的分区文件。因此,如果最初有10个分区/文件,但后来用只有6个分区的 DataFrame 覆盖了该文件夹,则生成的文件夹将包含6个分区/文件。

有关模式选项的更多信息,请参见 Spark SQL 文档

这个重载版本的 拯救函数适合我:

YourDF.save (outputPath,org.apache.spot.sql.SaveMode.valueOf (“ Overwrite”))

上面的例子覆盖了一个现有的文件夹:

Append : Append 模式意味着在将 DataFrame 保存到数据源时,如果数据/表已经存在,那么 DataFrame 的内容应该被附加到现有数据中。

ErrorIfExists : ErrorIfExists 模式意味着在将 DataFrame 保存到数据源时,如果数据已经存在,则会抛出异常。

Ignore : Ignore 模式意味着在将 DataFrame 保存到数据源时,如果数据已经存在,则预计保存操作不会保存 DataFrame 的内容,也不会更改现有数据。

如果您愿意使用自己的自定义输出格式,那么您也可以使用 RDD 获得所需的行为。

看看下面的课程: FileOutputFormat , 文件输出提交器

在文件输出格式中,有一个名为 checkOutputSpecs 的方法,它检查输出目录是否存在。 在 FileOutputCommittee 中,您拥有 committee Job,它通常将数据从临时目录传输到其最终位置。

我还不能验证它(只要我有几分钟的空闲时间,我就会这么做) ,但是理论上来说: 如果我扩展 FileOutputFormat 并覆盖 checkOutputSpecs 到一个不会在已经存在的目录上抛出异常的方法,并且调整我的自定义输出提交器的 committee Job 方法来执行任何我想要的逻辑(例如覆盖一些文件,附加其他文件) ,那么我也许能够在 RDD 上实现我想要的行为。

输出格式传递给: saveAsNewAPIHadoopFile (也是调用 saveAsTextFile 来实际保存文件的方法)。而 Output 提交器是在应用程序级别配置的。

由于 df.save(path, source, mode)已被弃用,(http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame)

使用 df.write.format(source).mode("overwrite").save(path)
Write 在哪里是 DataFrameWriter

‘ source’可以是(“ com.database ricks.spot.avro”| “ parquet”| “ json”)

如果您想使用 python 覆盖 parquet 文件,则 df.write.mode('overwrite').parquet("/output/folder/path")可以工作。这是在火花1.6.2。API 在以后的版本中可能会有所不同

  val jobName = "WordCount";
//overwrite the output directory in spark  set("spark.hadoop.validateOutputSpecs", "false")
val conf = new
SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
val sc = new SparkContext(conf)

Spark-覆盖输出目录:

默认情况下,Spark 不会覆盖 S3、 HDFS 和任何其他文件系统上的输出目录,因此,当您尝试将 DataFrame 内容写入现有目录时,Spark 将返回运行时错误。为了克服这个问题,Spark 提供了一个枚举 org.apache.parks. sql。SaveMode.覆盖以覆盖现有文件夹。

例如,我们需要使用这个 Overwrite 作为 DataFrameWrite 类的 mode ()函数的参数。

Mode (SaveMode. Overwrite) . csv (“/tmp/out/foldername”)

或者可以使用覆盖字符串。

Mode (“ overwrite”) . csv (“/tmp/out/foldername”)

除了 Overwrite,SaveMode 还提供其他模式,如 SaveMode.Append、 SaveMode.ErrorIfExists 和 SaveMode.Ignore

对于 Spark 的旧版本,可以使用以下内容用 RDD 内容覆盖输出目录。

Set (“火花.hadoop.validateOutputSpecs”,“ false”) Val parks Context = SparkContext (parks Conf)