How does Hadoop process records split across block boundaries?

According to the Hadoop - The Definitive Guide

The logical records that FileInputFormats define do not usually fit neatly into HDFS blocks. For example, a TextInputFormat’s logical records are lines, which will cross HDFS boundaries more often than not. This has no bearing on the functioning of your program—lines are not missed or broken, for example—but it’s worth knowing about, as it does mean that data-local maps (that is, maps that are running on the same host as their input data) will perform some remote reads. The slight overhead this causes is not normally significant.

Suppose a record line is split across two blocks (b1 and b2). The mapper processing the first block (b1) will notice that the last line doesn't have a EOL separator and fetches the remaining of the line from the next block of data (b2).

How does the mapper processing the second block (b2) determine that the first record is incomplete and should process starting from the second record in the block (b2)?

36006 次浏览

I see it as following: InputFormat is responsible to split data into logical splits taking into account the nature of the data.
Nothing prevents it to do so, although it can add significant latency to the job - all the logic and reading around the desired split size boundaries will happen in the jobtracker.
最简单的记录感知输入格式是 TextInputFormat。它的工作原理如下(据我对代码的理解)-输入格式创建按大小分割,不管行,但 LineRecordReader 总是:
a) Skip first line in the split (or part of it), if it is not the first split
b) Read one line after the boundary of the split in the end (if data it is available, so it is not the last split).

From what I've understood, when the FileSplit is initialized for the first block, the default constructor is called. Therefore the values for start and length are zero initially. By the end of processing of the fist block, the if the last line is incomplete, then the value of length will be greater than the length of the split and it'll read the first line of next block as well. Due to this the value of start for the first block will be greater than zero and under this condition, the LineRecordReader will skip the fist line of the second block. (See 来源)

如果第一个块的最后一行完成了,那么长度的值将等于第一个块的长度,第二个块的起始值将为零。在这种情况下,LineRecordReader不会跳过第一行,从开头读取第二个块。

有道理吗?

Interesting question, I spent some time looking at the code for the details and here are my thoughts. The splits are handled by the client by InputFormat.getSplits, so a look at FileInputFormat gives the following info:

  • 对于每个输入文件,获取文件长度、块大小并计算分割大小为 max(minSize, min(maxSize, blockSize)),其中 maxSize对应于 mapred.max.split.sizeminSize对应于 mapred.min.split.size
  • Divide the file into different FileSplits based on the split size calculated above. What's important here is that 每个 ABC0都使用与输入文件中的偏移量相对应的 start参数进行初始化. There is still no handling of the lines at that point. The relevant part of the code looks like this:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
    blkLocations[blkIndex].getHosts()));
    bytesRemaining -= splitSize;
    }
    

After that, if you look at the LineRecordReader which is defined by the TextInputFormat, that's where the lines are handled:

  • When you initialize your LineRecordReader it tries to instantiate a LineReader which is an abstraction to be able to read lines over FSDataInputStream. There are 2 cases:
  • If there is a CompressionCodec defined, then this codec is responsible for handling boundaries. Probably not relevant to your question.
  • If there is no codec however, that's where things are interesting: if the start of your InputSplit is different than 0, then you backtrack 1 character and then skip the first line you encounter identified by \n or \r\n (Windows) ! The backtrack is important because in case your line boundaries are the same as split boundaries, this ensures you do not skip the valid line. Here is the relevant code:

    if (codec != null) {
    in = new LineReader(codec.createInputStream(fileIn), job);
    end = Long.MAX_VALUE;
    } else {
    if (start != 0) {
    skipFirstLine = true;
    --start;
    fileIn.seek(start);
    }
    in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
    start += in.readLine(new Text(), 0,
    (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

So since the splits are calculated in the client, the mappers don't need to run in sequence, every mapper already knows if it neds to discard the first line or not.

So basically if you have 2 lines of each 100Mb in the same file, and to simplify let's say the split size is 64Mb. Then when the input splits are calculated, we will have the following scenario:

  • Split 1 containing the path and the hosts to this block. Initialized at start 200-200=0Mb, length 64Mb.
  • Split 2 initialized at start 200-200+64=64Mb, length 64Mb.
  • Split 3 initialized at start 200-200+128=128Mb, length 64Mb.
  • Split 4 initialized at start 200-200+192=192Mb, length 8Mb.
  • Mapper A will process split 1, start is 0 so don't skip first line, and read a full line which goes beyond the 64Mb limit so needs remote read.
  • Mapper B will process split 2, start is != 0 so skip the first line after 64Mb-1byte, which corresponds to the end of line 1 at 100Mb which is still in split 2, we have 28Mb of the line in split 2, so remote read the remaining 72Mb.
  • Mapper C will process split 3, start is != 0 so skip the first line after 128Mb-1byte, which corresponds to the end of line 2 at 200Mb, which is end of file so don't do anything.
  • Mapper D is the same as mapper C except it looks for a newline after 192Mb-1byte.

地图绘制者不需要沟通。文件块在 HDFS 中,当前映射器(RecordReader)可以读取包含该行剩余部分的块。这发生在幕后。

从 LineRecordReader.java 的 hadoop 源代码中,构造函数: 我找到了一些注释:

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

由此我相信 hadoop 会为每个分割读取一个额外的行(在当前分割的末尾,读取下一行在下一个分割) ,如果不是第一次分割,第一行将被扔掉。这样就不会有行记录丢失和不完整

Map Reduce 算法不适用于文件的物理块。它的工作原理是逻辑输入分割。输入分割取决于记录的写入位置。一条记录可以跨越两个 Mapper。

按照设置 HDFS的方式,它将非常大的文件分解成大的块(例如,大小为128MB) ,并将这些块的三个副本存储在集群中的不同节点上。

HDFS 不知道这些文件的内容。一个记录可能已经在 < em > Block-a 中开始,但是该记录的结束可能在 Block-b中出现。

To solve this problem, Hadoop uses a logical representation of the data stored in file blocks, known as input splits. When a MapReduce job client calculates the input splits, 它计算出块中的第一个整个记录从哪里开始,块中的最后一个记录从哪里结束.

重点是:

在块中的最后一条记录不完整的情况下,输入拆分包括下一个块的位置信息和完成记录所需的数据的字节偏移量。

看看下面的图表。

enter image description here

看看这个 article和相关的 SE 问题: 关于 Hadoop/HDFS 文件分割

更多细节可以从 文件阅读

Map-Reduce 框架依赖于作业的 InputFormat 来:

  1. 验证作业的输入规范。
  2. Split-up the input file(s) into logical InputSplits, each of which is then assigned to an individual Mapper.
  3. 然后将每个 InputSplit 分配给一个单独的 Mapper 进行处理。分裂可能是元组.InputSplit[] getSplits(JobConf job,int numSplits)是处理这些事情的 API。

FileInputFormat ,该文件扩展了 InputFormat实现的 getSplits()方法,请查看这个方法的内部结构: http://grepcode.com/file _/repo1.maven.org/maven2/com.ning/metics.action/0.2.7/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java/? v = source”rel = “ nofollow noReferrer”> grepcode