how to make saveAsTextFile NOT split output into multiple file?

When using Scala in Spark, whenever I dump the results out using saveAsTextFile, it seems to split the output into multiple parts. I'm just passing a parameter(path) to it.

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
  1. Does the number of outputs correspond to the number of reducers it uses?
  2. Does this mean the output is compressed?
  3. I know I can combine the output together using bash, but is there an option to store the output in a single text file, without splitting?? I looked at the API docs, but it doesn't say much about this.
107428 次浏览

The reason it saves it as multiple files is because the computation is distributed. If the output is small enough such that you think you can fit it on one machine, then you can end your program with

val arr = year.collect()

And then save the resulting array as a file, Another way would be to use a custom partitioner, partitionBy, and make it so everything goes to one partition though that isn't advisable because you won't get any parallelization.

If you require the file to be saved with saveAsTextFile you can use coalesce(1,true).saveAsTextFile(). This basically means do the computation then coalesce to 1 partition. You can also use repartition(1) which is just a wrapper for coalesce with the shuffle argument set to true. Looking through the source of RDD.scala is how I figured most of this stuff out, you should take a look.

You could call coalesce(1) and then saveAsTextFile() - but it might be a bad idea if you have a lot of data. Separate files per split are generated just like in Hadoop in order to let separate mappers and reducers write to different files. Having a single output file is only a good idea if you have very little data, in which case you could do collect() as well, as @aaronman said.

You will be able to do it in the next version of Spark, in the current version 1.0.0 it's not possible unless you do it manually somehow, for example, like you mentioned, with a bash script call.

I also want to mention that the documentation clearly states that users should be careful when calling coalesce with a real small number of partitions . this can cause upstream partitions to inherit this number of partitions.

I would not recommend using coalesce(1) unless really required.

In Spark 1.6.1 the format is as shown below. It creates a single output file.It is best practice to use it if the output is small enough to handle.Basically what it does is that it returns a new RDD that is reduced into numPartitions partitions.If you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1)

pair_result.coalesce(1).saveAsTextFile("/app/data/")

As others have mentioned, you can collect or coalesce your data set to force Spark to produce a single file. But this also limits the number of Spark tasks that can work on your dataset in parallel. I prefer to let it create a hundred files in the output HDFS directory, then use hadoop fs -getmerge /hdfs/dir /local/file.txt to extract the results into a single file in the local filesystem. This makes the most sense when your output is a relatively small report, of course.

Here's my answer to output a single file. I just added coalesce(1)

val year = sc.textFile("apat63_99.txt")
.map(_.split(",")(1))
.flatMap(_.split(","))
.map((_,1))
.reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")

Code:

year.coalesce(1).saveAsTextFile("year")

You can call repartition() and follow this way:

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)


var repartitioned = year.repartition(1)
repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00")

enter image description here

For those working with a larger dataset:

  • rdd.collect() should not be used in this case as it will collect all data as an Array in the driver, which is the easiest way to get out of memory.

  • rdd.coalesce(1).saveAsTextFile() should also not be used as the parallelism of upstream stages will be lost to be performed on a single node, where data will be stored from.

  • rdd.coalesce(1, shuffle = true).saveAsTextFile() is the best simple option as it will keep the processing of upstream tasks parallel and then only perform the shuffle to one node (rdd.repartition(1).saveAsTextFile() is an exact synonym).

  • rdd.saveAsSingleTextFile() as provided bellow additionally allows one to store the rdd in a single file with a specific name while keeping the parallelism properties of rdd.coalesce(1, shuffle = true).saveAsTextFile().


Something that can be inconvenient with rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt") is that it actually produces a file whose path is path/to/file.txt/part-00000 and not path/to/file.txt.

The following solution rdd.saveAsSingleTextFile("path/to/file.txt") will actually produce a file whose path is path/to/file.txt:

package com.whatever.package


import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec


object SparkHelper {


// This is an implicit class so that saveAsSingleTextFile can be attached to
// SparkContext and be called like this: sc.saveAsSingleTextFile
implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {


def saveAsSingleTextFile(path: String): Unit =
saveAsSingleTextFileInternal(path, None)


def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
saveAsSingleTextFileInternal(path, Some(codec))


private def saveAsSingleTextFileInternal(
path: String, codec: Option[Class[_ <: CompressionCodec]]
): Unit = {


// The interface with hdfs:
val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)


// Classic saveAsTextFile in a temporary folder:
hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already
codec match {
case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
case None        => rdd.saveAsTextFile(s"$path.tmp")
}


// Merge the folder of resulting part-xxxxx into one file:
hdfs.delete(new Path(path), true) // to make sure it's not there already
FileUtil.copyMerge(
hdfs, new Path(s"$path.tmp"),
hdfs, new Path(path),
true, rdd.sparkContext.hadoopConfiguration, null
)
// Working with Hadoop 3?: https://stackoverflow.com/a/50545815/9297144


hdfs.delete(new Path(s"$path.tmp"), true)
}
}
}

which can be used this way:

import com.whatever.package.SparkHelper.RDDExtensions


rdd.saveAsSingleTextFile("path/to/file.txt")
// Or if the produced file is to be compressed:
import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])

This snippet:

  • First stores the rdd with rdd.saveAsTextFile("path/to/file.txt") in a temporary folder path/to/file.txt.tmp as if we didn't want to store data in one file (which keeps the processing of upstream tasks parallel)

  • And then only, using the hadoop file system api, we proceed with the merge (FileUtil.copyMerge()) of the different output files to create our final output single file path/to/file.txt.