解析 Node.js 中的巨大日志文件-逐行读取

我需要对 Javascript/Node.js 中的大型(5-10 Gb)日志文件进行一些解析(我正在使用 Cube)。

日志线看起来像这样:

10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS".

我们需要读取每一行,进行一些解析(例如,去掉 57SUCCESS) ,然后使用 JS 客户机将这些数据输入 Cube (https://github.com/square/cube)。

首先,Node 中逐行读取文件的规范方法是什么?

这似乎是网上相当常见的问题:

许多答案似乎都指向一些第三方模块:

但是,这似乎是一个相当基本的任务——当然,在 stdlib 中有一种逐行读取文本文件的简单方法?

其次,我需要处理每一行(例如,将时间戳转换为 Date 对象,并提取有用的字段)。

最好的方法是什么,使吞吐量最大化?有没有什么方法可以不阻止读取每一行或者将其发送到 Cube?

第三-我猜测使用字符串分割,和 JS 等价的包含(IndexOf!= -1?)会比正则表达式快很多吗?有人在解析 Node.js 中的大量文本数据方面有很多经验吗?

干杯, 维克多

127616 次浏览

您可以使用内置的 readline包,参见文档 给你。我使用 溪流创建一个新的输出流。

    var fs = require('fs'),
readline = require('readline'),
stream = require('stream');
    

var instream = fs.createReadStream('/path/to/file');
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;
    

var rl = readline.createInterface({
input: instream,
output: outstream,
terminal: false
});
    

rl.on('line', function(line) {
console.log(line);
//Do your stuff ...
//Then write to output stream
rl.write(line);
});

处理大文件需要一些时间,请告诉我它是否有效。

Node-byline 使用流,所以我更喜欢用它来处理大型文件。

对于你的日期转换,我会使用 等一下 JS

为了最大化您的吞吐量,您可以考虑使用软件集群。还有一些很好的模块,它们很好地包装了节点本机集群模块。我喜欢艾萨克的 集群主机。例如,你可以创建一个 x worker 集群,它们都计算一个文件。

对于基准分割和正则表达式使用 基准 Js。我到现在还没有测试过它

我也有同样的问题。在比较了几个似乎具有这个特性的模块之后,我决定自己来做,它比我想象的要简单。

要点: https://gist.github.com/deemstone/8279565

var fetchBlock = lineByline(filepath, onEnd);
fetchBlock(function(lines, start){ ... });  //lines{array} start{int} lines[0] No.

它覆盖在一个闭包中打开的文件,返回的 fetchBlock()将从文件中获取一个块,结束拆分为数组(将处理上次获取的段)。

我已经将每个读操作的块大小设置为1024。这可能有 bug,但是代码逻辑是显而易见的,你可以自己试试。

我搜索了一个解决方案,使用流逐行解析非常大的文件(gbs)。所有第三方库和示例都不适合我的需要,因为它们不是逐行处理文件(比如1、2、3、4)。.)或将整个文件读入内存

下面的解决方案可以解析非常大的文件,逐行使用流和管道。为了测试,我使用了一个包含17.000.000条记录的2.1 gb 文件。内存使用量不超过60mb。

首先,安装 事件流软件包:

npm install event-stream

然后:

var fs = require('fs')
, es = require('event-stream');


var lineNr = 0;


var s = fs.createReadStream('very-large-file.csv')
.pipe(es.split())
.pipe(es.mapSync(function(line){


// pause the readstream
s.pause();


lineNr += 1;


// process line here and call s.resume() when rdy
// function below was for logging memory usage
logMemoryUsage(lineNr);


// resume the readstream, possibly from a callback
s.resume();
})
.on('error', function(err){
console.log('Error while reading file.', err);
})
.on('end', function(){
console.log('Read entire file.')
})
);

enter image description here

请告诉我进展如何!

基于 这个的问题回答,我实现了一个类,你可以用它与 fs.readSync()同步逐行读取文件。您可以通过使用 Q承诺(jQuery似乎需要一个 DOM,因此无法使用 nodejs运行它)来实现这种“暂停”和“恢复”:

var fs = require('fs');
var Q = require('q');


var lr = new LineReader(filenameToLoad);
lr.open();


var promise;
workOnLine = function () {
var line = lr.readNextLine();
promise = complexLineTransformation(line).then(
function() {console.log('ok');workOnLine();},
function() {console.log('error');}
);
}
workOnLine();


complexLineTransformation = function (line) {
var deferred = Q.defer();
// ... async call goes here, in callback: deferred.resolve('done ok'); or deferred.reject(new Error(error));
return deferred.promise;
}


function LineReader (filename) {
this.moreLinesAvailable = true;
this.fd = undefined;
this.bufferSize = 1024*1024;
this.buffer = new Buffer(this.bufferSize);
this.leftOver = '';


this.read = undefined;
this.idxStart = undefined;
this.idx = undefined;


this.lineNumber = 0;


this._bundleOfLines = [];


this.open = function() {
this.fd = fs.openSync(filename, 'r');
};


this.readNextLine = function () {
if (this._bundleOfLines.length === 0) {
this._readNextBundleOfLines();
}
this.lineNumber++;
var lineToReturn = this._bundleOfLines[0];
this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany)
return lineToReturn;
};


this.getLineNumber = function() {
return this.lineNumber;
};


this._readNextBundleOfLines = function() {
var line = "";
while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) { // read next bytes until end of file
this.leftOver += this.buffer.toString('utf8', 0, this.read); // append to leftOver
this.idxStart = 0
while ((this.idx = this.leftOver.indexOf("\n", this.idxStart)) !== -1) { // as long as there is a newline-char in leftOver
line = this.leftOver.substring(this.idxStart, this.idx);
this._bundleOfLines.push(line);
this.idxStart = this.idx + 1;
}
this.leftOver = this.leftOver.substring(this.idxStart);
if (line !== "") {
break;
}
}
};
}

我真的很喜欢 @ Gerard的答案,这里应该是正确的答案。我做了一些改进:

  • 代码在类(模块)中
  • 其中包括解析
  • 如果有一个异步作业被链接到读取 CSV,比如插入到 DB 或 HTTP 请求,恢复的能力将被赋予外部
  • 以块/批大小读取 用户可以声明。我也注意了流中的编码,以防 你有不同编码的文件。

密码是这样的:

'use strict'


const fs = require('fs'),
util = require('util'),
stream = require('stream'),
es = require('event-stream'),
parse = require("csv-parse"),
iconv = require('iconv-lite');


class CSVReader {
constructor(filename, batchSize, columns) {
this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8'))
this.batchSize = batchSize || 1000
this.lineNumber = 0
this.data = []
this.parseOptions = {delimiter: '\t', columns: true, escape: '/', relax: true}
}


read(callback) {
this.reader
.pipe(es.split())
.pipe(es.mapSync(line => {
++this.lineNumber


parse(line, this.parseOptions, (err, d) => {
this.data.push(d[0])
})


if (this.lineNumber % this.batchSize === 0) {
callback(this.data)
}
})
.on('error', function(){
console.log('Error while reading file.')
})
.on('end', function(){
console.log('Read entirefile.')
}))
}


continue () {
this.data = []
this.reader.resume()
}
}


module.exports = CSVReader

因此,基本上,这里是你将如何使用它:

let reader = CSVReader('path_to_file.csv')
reader.read(() => reader.continue())

我测试了这与35 GB 的 CSV 文件,它为我工作,这就是为什么我选择建立在 @ Gerard的答案,欢迎反馈。

我做了一个节点模块来异步读取大文件文本或 JSON。 在大文件上测试。

var fs = require('fs')
, util = require('util')
, stream = require('stream')
, es = require('event-stream');


module.exports = FileReader;


function FileReader(){


}


FileReader.prototype.read = function(pathToFile, callback){
var returnTxt = '';
var s = fs.createReadStream(pathToFile)
.pipe(es.split())
.pipe(es.mapSync(function(line){


// pause the readstream
s.pause();


//console.log('reading line: '+line);
returnTxt += line;


// resume the readstream, possibly from a callback
s.resume();
})
.on('error', function(){
console.log('Error while reading file.');
})
.on('end', function(){
console.log('Read entire file.');
callback(returnTxt);
})
);
};


FileReader.prototype.readJSON = function(pathToFile, callback){
try{
this.read(pathToFile, function(txt){callback(JSON.parse(txt));});
}
catch(err){
throw new Error('json file is not valid! '+err.stack);
}
};

只需将文件保存为 file-reader. js,并像下面这样使用它:

var FileReader = require('./file-reader');
var fileReader = new FileReader();
fileReader.readJSON(__dirname + '/largeFile.json', function(jsonObj){/*callback logic here*/});

我使用 https://www.npmjs.com/package/line-by-line从一个文本文件中读取超过100万行。在这种情况下,RAM 的占用容量约为50-60兆字节。

    const LineByLineReader = require('line-by-line'),
lr = new LineByLineReader('big_file.txt');


lr.on('error', function (err) {
// 'err' contains error object
});


lr.on('line', function (line) {
// pause emitting of lines...
lr.pause();


// ...do your asynchronous line processing..
setTimeout(function () {
// ...and continue emitting lines.
lr.resume();
}, 100);
});


lr.on('end', function () {
// All lines are read, file is closed now.
});

除了逐行读取大文件之外,您还可以逐块读取它。详情请参阅 这篇文章

var offset = 0;
var chunkSize = 2048;
var chunkBuffer = new Buffer(chunkSize);
var fp = fs.openSync('filepath', 'r');
var bytesRead = 0;
while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) {
offset += bytesRead;
var str = chunkBuffer.slice(0, bytesRead).toString();
var arr = str.split('\n');


if(bytesRead = chunkSize) {
// the last item of the arr may be not a full line, leave it to the next chunk
offset -= arr.pop().length;
}
lines.push(arr);
}
console.log(lines);
import * as csv from 'fast-csv';
import * as fs from 'fs';
interface Row {
[s: string]: string;
}
type RowCallBack = (data: Row, index: number) => object;
export class CSVReader {
protected file: string;
protected csvOptions = {
delimiter: ',',
headers: true,
ignoreEmpty: true,
trim: true
};
constructor(file: string, csvOptions = {}) {
if (!fs.existsSync(file)) {
throw new Error(`File ${file} not found.`);
}
this.file = file;
this.csvOptions = Object.assign({}, this.csvOptions, csvOptions);
}
public read(callback: RowCallBack): Promise < Array < object >> {
return new Promise < Array < object >> (resolve => {
const readStream = fs.createReadStream(this.file);
const results: Array < any > = [];
let index = 0;
const csvStream = csv.parse(this.csvOptions).on('data', async (data: Row) => {
index++;
results.push(await callback(data, index));
}).on('error', (err: Error) => {
console.error(err.message);
throw err;
}).on('end', () => {
resolve(results);
});
readStream.pipe(csvStream);
});
}
}
import { CSVReader } from '../src/helpers/CSVReader';
(async () => {
const reader = new CSVReader('./database/migrations/csv/users.csv');
const users = await reader.read(async data => {
return {
username: data.username,
name: data.name,
email: data.email,
cellPhone: data.cell_phone,
homePhone: data.home_phone,
roleId: data.role_id,
description: data.description,
state: data.state,
};
});
console.log(users);
})();

Js 文档提供了一个使用 Readline 模块的非常优雅的示例。

示例: 逐行读取文件流

const { once } = require('node:events');
const fs = require('fs');
const readline = require('readline');


const rl = readline.createInterface({
input: fs.createReadStream('sample.txt'),
crlfDelay: Infinity
});


rl.on('line', (line) => {
console.log(`Line from file: ${line}`);
});


await once(rl, 'close');

注意: 我们使用 crlf延迟选项将 CR LF (’r n’)的所有实例识别为单个换行符。

使用流和本机 nodejs 模块(F,读线)读/写文件 :

const fs = require('fs');
const readline = require('readline');


const rl = readline.createInterface({
input:  fs.createReadStream('input.json'),
output: fs.createWriteStream('output.json')
});


rl.on('line', function(line) {
console.log(line);


// Do any 'line' processing if you want and then write to the output file
this.output.write(`${line}\n`);
});


rl.on('close', function() {
console.log(`Created "${this.output.path}"`);
});