使用 park-CSV 编写单个 CSV 文件

我正在使用 https://github.com/databricks/spark-csv,我正在尝试写一个单一的 CSV,但不能,它正在制作一个文件夹。

需要一个 Scala 函数,它将接受像 path 和 file name 这样的参数并写入 CSV 文件。

379922 次浏览

它创建了一个包含多个文件的文件夹,因为每个分区都是单独保存的。如果你需要一个单一的输出文件(仍然在一个文件夹中) ,你可以 repartition(如果上游数据很大,首选,但需要洗牌) :

df
.repartition(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")

coalesce:

df
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")

储存前的资料框:

所有数据将写入 mydata.csv/part-00000。在使用此选项 确保您了解正在发生的事情以及将所有数据传输给一个工作人员的成本是多少之前。如果在复制时使用分散式档案系统,数据将被多次传输——首先获取到单个工作者,然后分布到存储节点。

或者,您可以让代码保持原样,然后使用通用的工具(如 catHDFS getmerge)来简单地合并所有的部分。

在保存之前重新分区/合并到1个分区(您仍然会得到一个文件夹,但它将有一个部分文件)

你可以使用 rdd.coalesce(1, true).saveAsTextFile(path)

它将数据存储为单个文件的路径/部分 -00000

这里我可能有点晚了,但是使用 coalesce(1)repartition(1)可能适用于小数据集,但是大数据集将全部抛入一个节点上的一个分区中。这可能会抛出 OOM 错误,或者最好的情况是,处理缓慢。

我强烈建议您使用 Hadoop API 中的 FileUtil.copyMerge()函数。这将把输出合并到一个文件中。

EDIT -这有效地将数据带到驱动程序而不是执行器节点。如果一个执行器比驱动程序有更多的 RAM 供使用,那么 Coalesce()就可以了。

编辑2 : 在 Hadoop 3.0中删除了 copyMerge()。关于如何使用最新版本的更多信息,请参见下面的堆栈溢出文章: 如何在 Hadoop 3.0中进行 CopyMerge?

如果您使用 HDFS 运行 Spark,我一直在通过正常编写 csv 文件并利用 HDFS 进行合并来解决这个问题。我在 Spark (1.6)中直接做到了这一点:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._


def merge(srcPath: String, dstPath: String): Unit =  {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}




val newData = << create your dataframe >>




val outputfile = "/user/feeds/project/outputs/subject"
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName


newData.write
.format("com.databricks.spark.csv")
.option("header", "false")
.mode("overwrite")
.save(outputFileName)
merge(mergeFindGlob, mergedFileName )
newData.unpersist()

我不记得我在哪学的这个技巧了,但它可能对你有用。

还有一种使用 Java 的方法

import java.io._


def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit)
{
val p = new java.io.PrintWriter(f);
try { op(p) }
finally { p.close() }
}


printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}

如果您正在使用 Databricks,并且可以将所有数据放入一个 worker 的 RAM 中(因此可以使用 .coalesce(1)) ,那么可以使用 dbfs 查找并移动生成的 CSV 文件:

val fileprefix= "/mnt/aws/path/file-prefix"


dataset
.coalesce(1)
.write
//.mode("overwrite") // I usually don't use this, but you may want to.
.option("header", "true")
.option("delimiter","\t")
.csv(fileprefix+".tmp")


val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
.filter(file=>file.name.endsWith(".csv"))(0).path


dbutils.fs.cp(partition_path,fileprefix+".tab")


dbutils.fs.rm(fileprefix+".tmp",recurse=true)

如果您的文件不适合工人的 RAM,您可能需要考虑 Chaotic3均衡建议使用 FileUtils.copMerge ()。我还没有这样做,也不知道是否可行,例如,在 S3上。

这个答案是基于以前对这个问题的回答以及我自己对提供的代码片段的测试构建的。我本来是寄到 Databricks 的和我在这里重新发布它。

我找到的关于 dbfs rm 递归选项的最佳文档在 一个 Databricks 论坛上。

一个适用于从 Minkymorgan 修改的 s3的解决方案。

只需将临时分区目录路径(与最终路径的名称不同)作为 srcPath传递,并将单个最终 csv/txt 作为 destPath传递,如果要删除原始目录,请指定同样的 deleteSource

/**
* Merges multiple partitions of spark text file output into single file.
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit =  {
import org.apache.hadoop.fs.FileUtil
import java.net.URI
val config = spark.sparkContext.hadoopConfiguration
val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
FileUtil.copyMerge(
fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
)
}

Spark 的 df.write() API 将在给定的路径中创建多个零件文件... ... 以强制 Spark 只写一个零件文件,使用 df.coalesce(1).write.csv(...)而不是 df.repartition(1).write.csv(...),因为合并是一种狭窄的转换,而重分区是一种宽泛的转换。参见 < a href = “ https://stackoverflow. com/a/31612810/5883310”> Spark-re 合并()

df.coalesce(1).write.csv(filepath,header=True)

将用一个 part-0001-...-c000.csv文件在给定的文件路径中创建文件夹 使用

cat filepath/part-0001-...-c000.csv > filename_you_want.csv

有一个用户友好的文件名

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql.{DataFrame,SaveMode,SparkSession}
import org.apache.spark.sql.functions._

我使用下面的方法解决了这个问题(hdfs 重命名文件名) :-

步骤1:-(Crate Data Frame and write to HDFS)

df.coalesce(1).write.format("csv").option("header", "false").mode(SaveMode.Overwrite).save("/hdfsfolder/blah/")

步骤2:-(创建 Hadoop 配置)

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)

步骤3:-(获取 hdfs 文件夹路径中的路径)

val pathFiles = new Path("/hdfsfolder/blah/")

步骤4:-(从 hdfs 文件夹获取火花文件名)

val fileNames = hdfs.listFiles(pathFiles, false)
println(fileNames)

Setp5:-(创建 scala 可变列表以保存所有文件名并将其添加到列表中)

    var fileNamesList = scala.collection.mutable.MutableList[String]()
while (fileNames.hasNext) {
fileNamesList += fileNames.next().getPath.getName
}
println(fileNamesList)

步骤6:-(filter _ SUCESS file order from file namesscala list)

    // get files name which are not _SUCCESS
val partFileName = fileNamesList.filterNot(filenames => filenames == "_SUCCESS")

步骤7:-(将 scala 列表转换为字符串,并将所需的文件名添加到 hdfs 文件夹字符串,然后应用重命名)

val partFileSourcePath = new Path("/yourhdfsfolder/"+ partFileName.mkString(""))
val desiredCsvTargetPath = new Path(/yourhdfsfolder/+ "op_"+ ".csv")
hdfs.rename(partFileSourcePath , desiredCsvTargetPath)

我在 Python 中使用它来获取单个文件:

df.toPandas().to_csv("/tmp/my.csv", sep=',', header=True, index=False)

通过使用 Listbuffer,我们可以将数据保存到单个文件中:

import java.io.FileWriter
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
val text = spark.read.textFile("filepath")
var data = ListBuffer[String]()
for(line:String <- text.collect()){
data += line
}
val writer = new FileWriter("filepath")
data.foreach(line => writer.write(line.toString+"\n"))
writer.close()

此答案扩展了已接受的答案,提供了更多上下文,并提供了可以在计算机上的 Spark Shell 中运行的代码片段。

更多关于已接受答案的背景资料

接受的答案可能会给您这样的印象: 示例代码输出一个 mydata.csv文件,但事实并非如此。让我们来演示一下:

val df = Seq("one", "two", "three").toDF("num")
df
.repartition(1)
.write.csv(sys.env("HOME")+ "/Documents/tmp/mydata.csv")

输出结果如下:

Documents/
tmp/
mydata.csv/
_SUCCESS
part-00000-b3700504-e58b-4552-880b-e7b52c60157e-c000.csv

注意: mydata.csv是一个文件夹在接受的答案-它不是一个文件!

如何输出具有特定名称的单个文件

我们可以使用 火花 Daria写出单个 mydata.csv文件。

import com.github.mrpowers.spark.daria.sql.DariaWriters
DariaWriters.writeSingleFile(
df = df,
format = "csv",
sc = spark.sparkContext,
tmpFolder = sys.env("HOME") + "/Documents/better/staging",
filename = sys.env("HOME") + "/Documents/better/mydata.csv"
)

这会将文件输出如下:

Documents/
better/
mydata.csv

S3路径

要在 S3中使用这个方法,需要将 s3a 路径传递给 DariaWriters.writeSingleFile:

DariaWriters.writeSingleFile(
df = df,
format = "csv",
sc = spark.sparkContext,
tmpFolder = "s3a://bucket/data/src",
filename = "s3a://bucket/data/dest/my_cool_file.csv"
)

有关更多信息,请参见 给你

避免复制合并

CopyMerge 从 Hadoop 3中删除。DariaWriters.writeSingleFile实现使用 fs.rename正如这里所描述的。因此,到2020年,CopyMerge 的实现将可以工作。我不确定 Spark 什么时候会升级到 Hadoop 3,但是最好避免在 Spark 升级 Hadoop 时使用任何 copy Merge 方法,因为这会导致代码中断。

源代码

如果您想检查实现,可以在 park-daria 源代码中查找 DariaWriters对象。

PySpark 实现

使用 PySpark 更容易写出单个文件,因为可以将 DataFrame 转换为默认情况下作为单个文件写出的 Panda DataFrame。

from pathlib import Path
home = str(Path.home())
data = [
("jellyfish", "JALYF"),
("li", "L"),
("luisa", "LAS"),
(None, None)
]
df = spark.createDataFrame(data, ["word", "expected"])
df.toPandas().to_csv(home + "/Documents/tmp/mydata-from-pyspark.csv", sep=',', header=True, index=False)

限制

DariaWriters.writeSingleFile Scala 方法和 df.toPandas() Python 方法只适用于小型数据集。大型数据集不能作为单个文件写出。从性能角度来看,将数据写入单个文件并不是最佳选择,因为数据不能并行写入。

spark.sql("select * from df").coalesce(1).write.option("mode","append").option("header","true").csv("/your/hdfs/path/")

这是数据帧

coalesce(1)repartition(1)—— > 这将使您的输出文件仅为1部分文件

写入数据

将数据追加到现有目录

option("header","true")-> 启用头部

编写为 CSV 文件及其在 HDFS 中的输出位置

def export_csv(
fileName: String,
filePath: String
) = {


val filePathDestTemp = filePath + ".dir/"
val merstageout_df = spark.sql(merstageout)


merstageout_df
.coalesce(1)
.write
.option("header", "true")
.mode("overwrite")
.csv(filePathDestTemp)
  

val listFiles = dbutils.fs.ls(filePathDestTemp)


for(subFiles <- listFiles){
val subFiles_name: String = subFiles.name
if (subFiles_name.slice(subFiles_name.length() - 4,subFiles_name.length()) == ".csv") {
dbutils.fs.cp (filePathDestTemp + subFiles_name,  filePath + fileName+ ".csv")
dbutils.fs.rm(filePathDestTemp, recurse=true)
}}}