在还原阶段后合并输出文件

在 mapreduce 中,每个 reduce 任务将其输出写入一个名为 零件的文件,其中 是与 reduce 任务关联的分区 ID。映射/缩减合并这些文件吗?如果是,怎么做?

76848 次浏览

You can run an additional map/reduce task, where map and reduce don't change the data, and partitioner assigns all data to a single reducer.

No, these files are not merged by Hadoop. The number of files you get is the same as the number of reduce tasks.

If you need that as input for a next job then don't worry about having separate files. Simply specify the entire directory as input for the next job.

If you do need the data outside of the cluster then I usually merge them at the receiving end when pulling the data off the cluster.

I.e. something like this:

hadoop fs -cat /some/where/on/hdfs/job-output/part-r-* > TheCombinedResultOfTheJob.txt

Instead of doing the file merging on your own, you can delegate the entire merging of the reduce output files by calling:

hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt

Note This combines the HDFS files locally. Make sure you have enough disk space before running

Why not use a pig script like this one for merging partition files:

stuff = load "/path/to/dir/*"


store stuff into "/path/to/mergedir"

That's the function you can use to Merge Files in HDFS

public boolean getMergeInHdfs(String src, String dest) throws IllegalArgumentException, IOException {
FileSystem fs = FileSystem.get(config);
Path srcPath = new Path(src);
Path dstPath = new Path(dest);


// Check if the path already exists
if (!(fs.exists(srcPath))) {
logger.info("Path " + src + " does not exists!");
return false;
}


if (!(fs.exists(dstPath))) {
logger.info("Path " + dest + " does not exists!");
return false;
}
return FileUtil.copyMerge(fs, srcPath, fs, dstPath, false, config, null);
}

For text files only and HDFS as both the source and destination, use the below command:

hadoop fs -cat /input_hdfs_dir/* | hadoop fs -put - /output_hdfs_file

This will concatenate all the files in input_hdfs_dir and will write the output back to HDFS at output_hdfs_file. Do keep in mind that all the data will be brought back to the local system and then again uploaded to hdfs, although no temporary files are created and this happens on the fly using UNIX pe.

Also, this won't work with non-text files such as Avro, ORC etc.

For binary files, you could do something like this (if you have Hive tables mapped on the directories):

insert overwrite table tbl select * from tbl

Depending on your configuration, this could also create more than files. To create a single file, either set the number of reducers to 1 explicitly using mapreduce.job.reduces=1 or set the hive property as hive.merge.mapredfiles=true.

The part-r-nnnnn files are generated after the reduce phase designated by 'r' in between. Now the fact is if you have one reducer running, you will have an output file like part-r-00000. If the number of reducers are 2 then you're going to have part-r-00000 and part-r-00001 and so on. Look, if the output file is too large to fit into the machine memory since the hadoop framework has been designed to run on Commodity Machines, then the file gets splitted. As per the MRv1, you have a limit of 20 reducers to work on your logic. You may have more but the same needs to be customised in the configuration files mapred-site.xml. Talking about your question; you may either use getmerge or you may set the number of reducers to 1 by embedding the following statement to the driver code

job.setNumReduceTasks(1);

Hope this answers your question.

Besides my previous answer I have one more answer for you which I was trying few minutes ago. You may use CustomOutputFormat which looks like the code given below

public class VictorOutputFormat extends FileOutputFormat<StudentKey,PassValue> {


@Override
public RecordWriter<StudentKey,PassValue> getRecordWriter(
TaskAttemptContext tac) throws IOException, InterruptedException {
//step 1: GET THE CURRENT PATH
Path currPath=FileOutputFormat.getOutputPath(tac);


//Create the full path
Path fullPath=new Path(currPath,"Aniruddha.txt");


//create the file in the file system
FileSystem fs=currPath.getFileSystem(tac.getConfiguration());
FSDataOutputStream fileOut=fs.create(fullPath,tac);
return new VictorRecordWriter(fileOut);
}


}

Just, have a look at the fourth line from the last. I have used my own name as the output file name and I have tested the program with 15 reducers. Still the File remains the same. So getting a single out file instead of two or more is possible yet to be very clear the size of the output file must not exceed the size of the primary memory i.e. the output file must fit into the memory of the commodity machine else there might be a problem with the output file split. Thanks!!

If the files have header, you can get rid of it of by doing this:

hadoop fs -cat /path/to/hdfs/job-output/part-* | grep -v "header" > output.csv

then add the header manually for output.csv

. Does map/reduce merge these files?

No. It does not merge.

You can use IdentityReducer to achieve your goal.

Performs no reduction, writing all input values directly to the output.

public void reduce(K key,
Iterator<V> values,
OutputCollector<K,V> output,
Reporter reporter)
throws IOException

Writes all keys and values directly to output.

Have a look at related SE posts:

hadoop: difference between 0 reducer and identity reducer?