在node.js?中一次读一行文件

我试图一次读取一行大文件。我找到了处理该主题的Quora上的一个问题,但我缺少一些连接来使整个事情适合在一起。

 var Lazy=require("lazy");
new Lazy(process.stdin)
.lines
.forEach(
function(line) {
console.log(line.toString());
}
);
process.stdin.resume();

我想弄清楚的一点是如何从文件中一次读取一行,而不是像本示例中那样读取STDIN。

我试过:

 fs.open('./VeryBigFile.csv', 'r', '0666', Process);


function Process(err, fd) {
if (err) throw err;
// DO lazy read
}

但它不起作用。我知道在紧要关头我可以回到使用PHP之类的东西,但我想弄清楚这一点。

我不认为另一个答案会起作用,因为文件比我运行它的服务器的内存要大得多。

655373 次浏览

您不必将文件open,而是必须创建一个ReadStream

fs.createReadStream

然后将该流传递给Lazy

使用载波模块

var carrier = require('carrier');


process.stdin.resume();
carrier.carry(process.stdin, function(line) {
console.log('got one line: ' + line);
});

由于节点中的排水/暂停/恢复的工作方式,我最终使用Lazy逐行读取大量内存泄漏,然后试图处理这些行并将它们写入另一个流(参见:http://elegantcode.com/2011/04/06/taking-baby-steps-with-node-js-pumping-data-between-streams/(顺便说一句,我喜欢这家伙))。我还没有足够仔细地观察Lazy来理解到底为什么,但是我不能暂停我的读取流,以便在Lazy不退出的情况下进行排水。

我编写了将大量csv文件处理为xml文档的代码,您可以在此处看到代码:https://github.com/j03m/node-csv2xml

如果您使用Lazy行运行以前的版本,它会泄漏。最新版本根本不会泄漏,您可以将其用作阅读器/处理器的基础。虽然我在那里有一些自定义的东西。

编辑:我想我还应该注意到,我使用Lazy的代码工作得很好,直到我发现自己编写了足够大的xml片段,这些片段因为需要而耗尽/暂停/恢复。对于较小的块来说,这很好。

function createLineReader(fileName){
var EM = require("events").EventEmitter
var ev = new EM()
var stream = require("fs").createReadStream(fileName)
var remainder = null;
stream.on("data",function(data){
if(remainder != null){//append newly received data chunk
var tmp = new Buffer(remainder.length+data.length)
remainder.copy(tmp)
data.copy(tmp,remainder.length)
data = tmp;
}
var start = 0;
for(var i=0; i<data.length; i++){
if(data[i] == 10){ //\n new line
var line = data.slice(start,i)
ev.emit("line", line)
start = i+1;
}
}
if(start<data.length){
remainder = data.slice(start);
}else{
remainder = null;
}
})


stream.on("end",function(){
if(null!=remainder) ev.emit("line",remainder)
})


return ev
}




//---------main---------------
fileName = process.argv[2]


lineReader = createLineReader(fileName)
lineReader.on("line",function(line){
console.log(line.toString())
//console.log("++++++++++++++++++++")
})

编辑:

使用转换流


使用BufferedReader,您可以读取行。

new BufferedReader ("lorem ipsum", { encoding: "utf8" })
.on ("error", function (error){
console.log ("error: " + error);
})
.on ("line", function (line){
console.log ("line: " + line);
})
.on ("end", function (){
console.log ("EOF");
})
.read ();

有一个非常好的模块用于逐行读取文件,它被称为行阅读器

有了它,你只需写下:

var lineReader = require('line-reader');


lineReader.eachLine('file.txt', function(line, last) {
console.log(line);
// do whatever you want with line...
if(last){
// or check if it's the last one
}
});

如果您需要更多控制,您甚至可以使用“java-style”界面迭代文件:

lineReader.open('file.txt', function(reader) {
if (reader.hasNextLine()) {
reader.nextLine(function(line) {
console.log(line);
});
}
});

您可以随时滚动自己的行阅读器。我还没有对这个片段进行基准测试,但它正确地将传入的块流拆分为没有尾随'\n'的行

var last = "";


process.stdin.on('data', function(chunk) {
var lines, i;


lines = (last+chunk).split("\n");
for(i = 0; i < lines.length - 1; i++) {
console.log("line: " + lines[i]);
}
last = lines[i];
});


process.stdin.on('end', function() {
console.log("line: " + last);
});


process.stdin.resume();

我确实在处理一个快速日志解析脚本时想出了这个问题,该脚本需要在日志解析期间积累数据,我觉得尝试使用js和node而不是使用perl或bash来做到这一点会很好。

无论如何,我确实觉得小型nodejs脚本应该是自包含的,而不是依赖于第三方模块,所以在阅读了这个问题的所有答案之后,每个都使用各种模块来处理行解析,一个13 SLOC原生nodejs解决方案可能会感兴趣。

我想解决同样的问题,基本上是Perl中的问题:

while (<>) {
process_line($_);
}

我的用例只是一个独立的脚本,而不是服务器,所以同步很好。这些是我的标准:

  • 可以在许多项目中重用的最小同步代码。
  • 对文件大小或行数没有限制。
  • 对行的长度没有限制。
  • 能够处理UTF-8中的完整Unicode,包括BMP之外的字符。
  • 能够处理*nix和Windows行结尾(我不需要老式Mac)。
  • 行尾要包含在行中的字符。
  • 能够处理有或没有行尾字符的最后一行。
  • 不要使用node.js发行版中未包含的任何外部库。

这是一个让我了解node.js中低级脚本类型代码的项目,并决定它作为Perl等其他脚本语言的替代品的可行性。

经过惊人的努力和几次错误的开始,这是我想出的代码。它非常快,但比我预期的要简单:(在GitHub上分叉)

var fs            = require('fs'),
StringDecoder = require('string_decoder').StringDecoder,
util          = require('util');


function lineByLine(fd) {
var blob = '';
var blobStart = 0;
var blobEnd = 0;


var decoder = new StringDecoder('utf8');


var CHUNK_SIZE = 16384;
var chunk = new Buffer(CHUNK_SIZE);


var eolPos = -1;
var lastChunk = false;


var moreLines = true;
var readMore = true;


// each line
while (moreLines) {


readMore = true;
// append more chunks from the file onto the end of our blob of text until we have an EOL or EOF
while (readMore) {


// do we have a whole line? (with LF)
eolPos = blob.indexOf('\n', blobStart);


if (eolPos !== -1) {
blobEnd = eolPos;
readMore = false;


// do we have the last line? (no LF)
} else if (lastChunk) {
blobEnd = blob.length;
readMore = false;


// otherwise read more
} else {
var bytesRead = fs.readSync(fd, chunk, 0, CHUNK_SIZE, null);


lastChunk = bytesRead !== CHUNK_SIZE;


blob += decoder.write(chunk.slice(0, bytesRead));
}
}


if (blobStart < blob.length) {
processLine(blob.substring(blobStart, blobEnd + 1));


blobStart = blobEnd + 1;


if (blobStart >= CHUNK_SIZE) {
// blobStart is in characters, CHUNK_SIZE is in octets
var freeable = blobStart / CHUNK_SIZE;


// keep blob from growing indefinitely, not as deterministic as I'd like
blob = blob.substring(CHUNK_SIZE);
blobStart -= CHUNK_SIZE;
blobEnd -= CHUNK_SIZE;
}
} else {
moreLines = false;
}
}
}

它可能会被进一步清理,这是试验和错误的结果。

我使用这个:

function emitLines(stream, re){
re = re && /\n/;
var buffer = '';


stream.on('data', stream_data);
stream.on('end', stream_end);


function stream_data(data){
buffer += data;
flush();
}//stream_data


function stream_end(){
if(buffer) stream.emmit('line', buffer);
}//stream_end




function flush(){
var re = /\n/;
var match;
while(match = re.exec(buffer)){
var index = match.index + match[0].length;
stream.emit('line', buffer.substring(0, index));
buffer = buffer.substring(index);
re.lastIndex = 0;
}
}//flush


}//emitLines

在流上使用此函数并监听将发出的行事件。

gr-

对于这样一个简单的操作,不应该有任何对第三方模块的依赖。

var fs = require('fs'),
readline = require('readline');


var rd = readline.createInterface({
input: fs.createReadStream('/path/to/file'),
output: process.stdout,
console: false
});


rd.on('line', function(line) {
console.log(line);
});
require('fs').readFileSync('file.txt', 'utf-8').split(/\r?\n/).forEach(function(line){
console.log(line);
})

老话题,但这是有效的:

var rl = readline.createInterface({
input : fs.createReadStream('/path/file.txt'),
output: process.stdout,
terminal: false
})
rl.on('line',function(line){
console.log(line) //or parse line
})

简单。不需要外部模块。

我对这个问题缺乏全面的解决方案感到沮丧,所以我整理了自己的尝试(git/npm)。复制粘贴的功能列表:

  • 交互式行处理(基于回调,无需将整个文件加载到RAM中)
  • 可选地,返回数组中的所有行(详细或原始模式)
  • 交互式中断流,或执行map/filter等处理
  • 检测任何换行符约定(PC/Mac/Linux)
  • 正确的eof/最后一行处理
  • 正确处理多字节UTF-8字符
  • 每行检索字节偏移量和字节长度信息
  • 随机访问,使用基于行或基于字节的偏移
  • 自动映射线偏移信息,以加快随机访问
  • 零依赖
  • 测试

NIH?你决定:-)

我有一个小模块可以很好地做到这一点,并且被许多其他项目使用npm readline注意,在节点v10中有一个本机readline模块,所以我将我的模块重新发布为linebylinehttps://www.npmjs.com/package/linebyline

如果您不想使用该模块,则功能非常简单:

var fs = require('fs'),
EventEmitter = require('events').EventEmitter,
util = require('util'),
newlines = [
13, // \r
10  // \n
];
var readLine = module.exports = function(file, opts) {
if (!(this instanceof readLine)) return new readLine(file);


EventEmitter.call(this);
opts = opts || {};
var self = this,
line = [],
lineCount = 0,
emit = function(line, count) {
self.emit('line', new Buffer(line).toString(), count);
};
this.input = fs.createReadStream(file);
this.input.on('open', function(fd) {
self.emit('open', fd);
})
.on('data', function(data) {
for (var i = 0; i < data.length; i++) {
if (0 <= newlines.indexOf(data[i])) { // Newline char was found.
lineCount++;
if (line.length) emit(line, lineCount);
line = []; // Empty buffer.
} else {
line.push(data[i]); // Buffer new line data.
}
}
}).on('error', function(err) {
self.emit('error', err);
}).on('end', function() {
// Emit last line if anything left over since EOF won't trigger it.
if (line.length){
lineCount++;
emit(line, lineCount);
}
self.emit('end');
}).on('close', function() {
self.emit('close');
});
};
util.inherits(readLine, EventEmitter);

基于发电机的线路阅读器:https://github.com/neurosnap/gen-readlines

var fs = require('fs');
var readlines = require('gen-readlines');


fs.open('./file.txt', 'r', function(err, fd) {
if (err) throw err;
fs.fstat(fd, function(err, stats) {
if (err) throw err;


for (var line of readlines(fd, stats.size)) {
console.log(line.toString());
}


});
});

如果您想逐行读取文件并将其写入另一个文件:

var fs = require('fs');
var readline = require('readline');
var Stream = require('stream');


function readFileLineByLine(inputFile, outputFile) {


var instream = fs.createReadStream(inputFile);
var outstream = new Stream();
outstream.readable = true;
outstream.writable = true;


var rl = readline.createInterface({
input: instream,
output: outstream,
terminal: false
});


rl.on('line', function (line) {
fs.appendFileSync(outputFile, line + '\n');
});
};

从Node.jsv0.12和Node.jsv4.0.0开始,有一个稳定的readline核心模块。这是从文件中读取行的最简单方法,无需任何外部模块:

const fs = require('fs');
const readline = require('readline');


async function processLineByLine() {
const fileStream = fs.createReadStream('input.txt');


const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
// Note: we use the crlfDelay option to recognize all instances of CR LF
// ('\r\n') in input.txt as a single line break.


for await (const line of rl) {
// Each line in input.txt will be successively available here as `line`.
console.log(`Line from file: ${line}`);
}
}


processLineByLine();

或者:

var lineReader = require('readline').createInterface({
input: require('fs').createReadStream('file.in')
});


lineReader.on('line', function (line) {
console.log('Line from file:', line);
});

最后一行被正确读取(从Node v0.12或更高版本开始),即使没有最终的\n

更新:这个例子是添加到Node的API官方留档

自从发布我的原始答案以来,我发现分裂是一个非常易于使用的节点模块,用于文件中的行读取;它还接受可选参数。

var split = require('split');
fs.createReadStream(file)
.pipe(split())
.on('data', function (line) {
//each chunk now is a seperate line!
});

没有在非常大的文件上进行测试。如果您这样做,请告诉我们。

var fs = require('fs');


function readfile(name,online,onend,encoding) {
var bufsize = 1024;
var buffer = new Buffer(bufsize);
var bufread = 0;
var fd = fs.openSync(name,'r');
var position = 0;
var eof = false;
var data = "";
var lines = 0;


encoding = encoding || "utf8";


function readbuf() {
bufread = fs.readSync(fd,buffer,0,bufsize,position);
position += bufread;
eof = bufread ? false : true;
data += buffer.toString(encoding,0,bufread);
}


function getLine() {
var nl = data.indexOf("\r"), hasnl = nl !== -1;
if (!hasnl && eof) return fs.closeSync(fd), online(data,++lines), onend(lines);
if (!hasnl && !eof) readbuf(), nl = data.indexOf("\r"), hasnl = nl !== -1;
if (!hasnl) return process.nextTick(getLine);
var line = data.substr(0,nl);
data = data.substr(nl+1);
if (data[0] === "\n") data = data.substr(1);
online(line,++lines);
process.nextTick(getLine);
}
getLine();
}

我遇到了同样的问题,并提出了上述解决方案 看起来与其他人相似,但它是同步的,可以非常快速地读取大文件

希望这能有所帮助

在大多数情况下,这应该足够了:

const fs = require("fs")


fs.readFile('./file', 'utf-8', (err, file) => {
const lines = file.split('\n')


for (let line of lines)
console.log(line)
});

虽然你可能应该像上面的答案建议的那样使用readline模块,但readline似乎面向命令行界面而不是行读取。它在缓冲方面也有点不透明。(任何需要面向流式行的阅读器的人都可能想要调整缓冲区大小)。readline模块约为1000行,而包含统计和测试的readline模块为34行。

const EventEmitter = require('events').EventEmitter;
class LineReader extends EventEmitter{
constructor(f, delim='\n'){
super();
this.totalChars = 0;
this.totalLines = 0;
this.leftover = '';


f.on('data', (chunk)=>{
this.totalChars += chunk.length;
let lines = chunk.split(delim);
if (lines.length === 1){
this.leftover += chunk;
return;
}
lines[0] = this.leftover + lines[0];
this.leftover = lines[lines.length-1];
if (this.leftover) lines.pop();
this.totalLines += lines.length;
for (let l of lines) this.onLine(l);
});
// f.on('error', ()=>{});
f.on('end', ()=>{console.log('chars', this.totalChars, 'lines', this.totalLines)});
}
onLine(l){
this.emit('line', l);
}
}
//Command line test
const f = require('fs').createReadStream(process.argv[2], 'utf8');
const delim = process.argv[3];
const lineReader = new LineReader(f, delim);
lineReader.on('line', (line)=> console.log(line));

这是一个更短的版本,没有统计数据,只有19行:

class LineReader extends require('events').EventEmitter{
constructor(f, delim='\n'){
super();
this.leftover = '';
f.on('data', (chunk)=>{
let lines = chunk.split(delim);
if (lines.length === 1){
this.leftover += chunk;
return;
}
lines[0] = this.leftover + lines[0];
this.leftover = lines[lines.length-1];
if (this.leftover)
lines.pop();
for (let l of lines)
this.emit('line', l);
});
}
}
const fs = require("fs")


fs.readFile('./file', 'utf-8', (err, data) => {
var innerContent;
console.log("Asynchronous read: " + data.toString());
const lines = data.toString().split('\n')
for (let line of lines)
innerContent += line + '<br>';




});

另一种解决方案是通过顺序执行器nsynjs运行逻辑。它使用节点readline模块逐行读取文件,并且它不使用Promise或递归,因此不会在大文件上失败。代码如下所示:

var nsynjs = require('nsynjs');
var textFile = require('./wrappers/nodeReadline').textFile; // this file is part of nsynjs


function process(textFile) {


var fh = new textFile();
fh.open('path/to/file');
var s;
while (typeof(s = fh.readLine(nsynjsCtx).data) != 'undefined')
console.log(s);
fh.close();
}


var ctx = nsynjs.run(process,{},textFile,function () {
console.log('done');
});

上面的代码基于此示例:https://github.com/amaksr/nsynjs/blob/master/examples/node-readline/index.js

我在验证它不是目录并且它不包含在文件列表中后使用下面的代码读取行,无需检查。

(function () {
var fs = require('fs');
var glob = require('glob-fs')();
var path = require('path');
var result = 0;
var exclude = ['LICENSE',
path.join('e2e', 'util', 'db-ca', 'someother-file'),
path.join('src', 'favicon.ico')];
var files = [];
files = glob.readdirSync('**');


var allFiles = [];


var patternString = [
'trade',
'order',
'market',
'securities'
];


files.map((file) => {
try {
if (!fs.lstatSync(file).isDirectory() && exclude.indexOf(file) === -1) {
fs.readFileSync(file).toString().split(/\r?\n/).forEach(function(line){
patternString.map((pattern) => {
if (line.indexOf(pattern) !== -1) {
console.log(file + ' contain `' + pattern + '` in in line "' + line +'";');
result = 1;
}
});
});
}
} catch (e) {
console.log('Error:', e.stack);
}
});
process.exit(result);


})();

我已经查看了所有上述答案,它们都使用第三方库来解决它。它在Node的API中有一个简单的解决方案。例如

const fs= require('fs')


let stream = fs.createReadStream('<filename>', { autoClose: true })


stream.on('data', chunk => {
let row = chunk.toString('ascii')
}))

我将日常行处理的整个逻辑包装为一个npm模块:line-kit https://www.npmjs.com/package/line-kit

// example
var count = 0
require('line-kit')(require('fs').createReadStream('/etc/issue'),
(line) => { count++; },
() => {console.log(`seen ${count} lines`)})

2019年更新

一个很棒的例子已经发布在官方Nodejs留档。

这需要在您的机器上安装最新的Nodejs。>11.4

const fs = require('fs');
const readline = require('readline');


async function processLineByLine() {
const fileStream = fs.createReadStream('input.txt');


const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
// Note: we use the crlfDelay option to recognize all instances of CR LF
// ('\r\n') in input.txt as a single line break.


for await (const line of rl) {
// Each line in input.txt will be successively available here as `line`.
console.log(`Line from file: ${line}`);
}
}


processLineByLine();

在进行此类操作时,我们必须问自己的两个问题是:

  1. 用于执行它的内存量是多少?
  2. 内存消耗是否随着文件大小而急剧增加?

require('fs').readFileSync()这样的解决方案会将整个文件加载到内存中。这意味着执行操作所需的内存量几乎与文件大小相等。我们应该避免对大于50mbs的任何东西使用这些

我们可以通过在函数调用之后放置这些代码行来轻松跟踪使用的内存量

    const used = process.memoryUsage().heapUsed / 1024 / 1024;
console.log(
`The script uses approximately ${Math.round(used * 100) / 100} MB`
);

现在从大文件中读取特定行的最佳方法是使用节点的readline。留档有惊人的示例

这是我最喜欢的浏览文件的方式,这是一个简单的本地解决方案,用于使用现代async/await读取渐进式(而不是“slurp”或全内存方式)文件。在处理大型文本文件时,我发现这是一个“自然”的解决方案,而无需求助于readline包或任何非核心依赖项。

let buf = '';
for await ( const chunk of fs.createReadStream('myfile') ) {
const lines = buf.concat(chunk).split(/\r?\n/);
buf = lines.pop();
for( const line of lines ) {
console.log(line);
}
}
if(buf.length) console.log(buf);  // last line, if file does not end with newline

您可以在fs.createReadStream中调整编码或使用chunk.toString(<arg>)。此外,这让您可以根据自己的口味更好地微调行拆分,即。使用.split(/\n+/)跳过空行并使用{ highWaterMark: <chunkSize> }控制块大小。

不要忘记创建像processLine(line)这样的函数,以避免由于buf的结尾剩余而重复两次行处理代码。不幸的是,ReadStream实例在此设置中不更新其文件结束标志,因此,在没有一些更详细的技巧(例如比较fs.Stats().bytesRead的文件大小)的情况下,无法在循环中检测我们是否处于最后一次迭代中。因此,最终的buf处理解决方案,除非你绝对确定你的文件以换行符\n结尾,在这种情况下,for await循环应该就足够了。

★如果你喜欢事件异步版本,这将是它:

let buf = '';
fs.createReadStream('myfile')
.on('data', chunk => {
const lines = buf.concat(chunk).split(/\r?\n/);
buf = lines.pop();
for( const line of lines ) {
console.log(line);
}
})
.on('end', () => buf.length && console.log(buf) );

★现在,如果你不介意导入stream核心包,那么这是等效的管道流版本,它允许链接转换,如gzip解压缩:

const { Writable } = require('stream');
let buf = '';
fs.createReadStream('myfile').pipe(
new Writable({
write: (chunk, enc, next) => {
const lines = buf.concat(chunk).split(/\r?\n/);
buf = lines.pop();
for (const line of lines) {
console.log(line);
}
next();
}
})
).on('finish', () => buf.length && console.log(buf) );

Node.jsv18.11.0中添加了一个新函数来逐行读取文件

  • filehandle.read线([选项])

这是您如何使用它与您要读取的文本文件

import { open } from 'node:fs/promises';
myFileReader();
async function myFileReader() {
const file = await open('./TextFileName.txt');
for await (const line of file.readLines()) {
console.log(line)
}
}

要了解更多读Node.js留档,这里是文件系统readline()的链接: https://nodejs.org/api/fs.html#filehandlereadlinesoptions