解析 Nodejs 的大型 JSON 文件

我有一个以 JSON 格式存储许多 JavaScript 对象的文件,我需要读取该文件,创建每个对象,并对它们进行处理(在我的例子中将它们插入到一个 db 中)。JavaScript 对象可以用一种格式表示:

格式 A:

[{name: 'thing1'},
....
{name: 'thing999999999'}]

格式 B:

{name: 'thing1'}         // <== My choice.
...
{name: 'thing999999999'}

注意,...表示许多 JSON 对象。我知道我可以把整个文件读入内存,然后像这样使用 JSON.parse():

fs.readFile(filePath, 'utf-8', function (err, fileContents) {
if (err) throw err;
console.log(JSON.parse(fileContents));
});

但是,文件可能非常大,我更愿意使用流来实现这一点。我在流中看到的问题是,文件内容可能在任何时候被分解成数据块,因此我如何在这样的对象上使用 JSON.parse()

理想情况下,每个对象将作为一个单独的数据块读取,但我不确定是否在 怎么做上。

var importStream = fs.createReadStream(filePath, {flags: 'r', encoding: 'utf-8'});
importStream.on('data', function(chunk) {


var pleaseBeAJSObject = JSON.parse(chunk);
// insert pleaseBeAJSObject in a database
});
importStream.on('end', function(item) {
console.log("Woot, imported objects into the database!");
});*/

注意,我希望防止将整个文件读入内存。时间效率对我来说并不重要。是的,我可以尝试一次读取多个对象,然后一次插入所有对象,但这是一个性能调整——我需要一种保证不会导致内存过载的方法,无论文件中包含多少对象。

我可以选择使用 FormatA或者 FormatB或者其他什么,请在您的回答中指定。谢谢!

149798 次浏览

我觉得你需要一个数据库。MongoDB 在这种情况下是一个很好的选择,因为它是 JSON 兼容的。

更新 : 可以使用 蒙古进口货工具将 JSON 数据导入 MongoDB。

mongoimport --collection collection --file collection.json

正如我认为编写一个流 JSON 解析器会很有趣一样,我也认为也许我应该快速搜索一下,看看是否已经有一个可用的解析器。

事实证明是有的。

  • JSONStream < em > “ stream JSON.parse and stringify”

由于我刚刚发现它,我显然没有使用它,所以我不能评论它的质量,但我会有兴趣听到它是否工作。

它确实可以工作,考虑以下 Javascript 和 _.isString:

stream.pipe(JSONStream.parse('*'))
.on('data', (d) => {
console.log(typeof d);
console.log("isString: " + _.isString(d))
});

如果流是对象数组,这将在对象进入时记录它们。因此,每次只有一个对象被缓冲。

要逐行处理文件,您只需将文件的读取和作用于该输入的代码解耦。可以通过缓冲输入直到命中换行符来实现这一点。假设每行有一个 JSON 对象(基本上是 B 格式) :

var stream = fs.createReadStream(filePath, {flags: 'r', encoding: 'utf-8'});
var buf = '';


stream.on('data', function(d) {
buf += d.toString(); // when data is read, stash it in a string buffer
pump(); // then process the buffer
});


function pump() {
var pos;


while ((pos = buf.indexOf('\n')) >= 0) { // keep going while there's a newline somewhere in the buffer
if (pos == 0) { // if there's more than one newline in a row, the buffer will now start with a newline
buf = buf.slice(1); // discard it
continue; // so that the next iteration will start with data
}
processLine(buf.slice(0,pos)); // hand off the line
buf = buf.slice(pos+1); // and slice the processed data off the buffer
}
}


function processLine(line) { // here's where we do something with a line


if (line[line.length-1] == '\r') line=line.substr(0,line.length-1); // discard CR (0x0D)


if (line.length > 0) { // ignore empty lines
var obj = JSON.parse(line); // parse the JSON
console.log(obj); // do something with the data here!
}
}

每次文件流接收到来自文件系统的数据时,都会将其隐藏在缓冲区中,然后调用 pump

如果缓冲区中没有换行符,那么 pump只是返回而不执行任何操作。下一次流获得数据时,将向缓冲区添加更多的数据(可能还有换行符) ,然后我们就有了一个完整的对象。

如果有换行符,则 pump将缓冲区从开始切割到换行符,并将其交给 process。然后它再次检查缓冲区中是否有另一个换行符(while循环)。通过这种方式,我们可以处理当前块中读取的所有行。

最后,每个输入行调用一次 process。如果存在,它会去掉回车字符(以避免行结束符 & nash; LF vs CRLF 的问题) ,然后将 JSON.parse调用为 line。此时,您可以对您的对象执行任何需要的操作。

请注意,JSON.parse对接受什么作为输入是严格的; 您必须引用标识符和字符串值 双引号。换句话说,{name:'thing1'}将抛出一个错误; 您必须使用 {"name":"thing1"}

因为一次只有一块数据在内存中,这将极大地提高内存效率。而且速度极快。一个快速测试显示,我在15毫秒内处理了10,000行。

从2014年10月开始,你可以像下面这样做(使用 JSONStream)-< a href = “ https://www.npmjs.org/package/JSONStream”rel = “ norefrer”> https://www.npmjs.org/package/JSONStream

var fs = require('fs'),
JSONStream = require('JSONStream'),


var getStream() = function () {
var jsonData = 'myData.json',
stream = fs.createReadStream(jsonData, { encoding: 'utf8' }),
parser = JSONStream.parse('*');
return stream.pipe(parser);
}


getStream().pipe(MyTransformToDoWhateverProcessingAsNeeded).on('error', function (err) {
// handle any errors
});

举例说明:

npm install JSONStream event-stream

约翰逊:

{
"greeting": "hello world"
}

你好:

var fs = require('fs'),
JSONStream = require('JSONStream'),
es = require('event-stream');


var getStream = function () {
var jsonData = 'data.json',
stream = fs.createReadStream(jsonData, { encoding: 'utf8' }),
parser = JSONStream.parse('*');
return stream.pipe(parser);
};


getStream()
.pipe(es.mapSync(function (data) {
console.log(data);
}));
$ node hello.js
// hello world

我解决了这个问题使用的 分离式核磁共振模块。管道您的流成分裂,它将“ 分解一个流并重新组装它,以便每一行都是一个块”。

示例代码:

var fs = require('fs')
, split = require('split')
;


var stream = fs.createReadStream(filePath, {flags: 'r', encoding: 'utf-8'});
var lineStream = stream.pipe(split());
linestream.on('data', function(chunk) {
var json = JSON.parse(chunk);
// ...
});

我也有类似的需求,我需要读取节点 js 中的大型 json 文件,分块处理数据,调用 api 并保存到 mongodb 中。 Json 说:

{
"customers":[
{ /*customer data*/},
{ /*customer data*/},
{ /*customer data*/}....
]
}

现在,我使用 JsonStream 和 EventStream 来同步实现这一点。

var JSONStream = require("JSONStream");
var es = require("event-stream");


fileStream = fs.createReadStream(filePath, { encoding: "utf8" });
fileStream.pipe(JSONStream.parse("customers.*")).pipe(
es.through(function(data) {
console.log("printing one customer object read from file ::");
console.log(data);
this.pause();
processOneCustomer(data, this);
return data;
}),
function end() {
console.log("stream reading ended");
this.emit("end");
}
);


function processOneCustomer(data, es) {
DataModel.save(function(err, dataModel) {
es.resume();
});
}

我意识到,如果可能的话,您希望避免将整个 JSON 文件读入内存,但是如果您有可用的内存,那么从性能方面来说,这可能不是一个坏主意。在 json 文件上使用 node.js 的 need ()可以非常快地将数据加载到内存中。

我运行了两个测试,以查看从一个81MB 的 geojson 文件中打印出每个特性的属性时的性能。

在第一个测试中,我使用 var data = require('./geo.json')将整个 Geojson 文件读入内存。这需要3330毫秒,然后从每个特性中打印出一个属性需要804毫秒,总共需要4134毫秒。但是,node.js 似乎使用了411MB 的内存。

在第二个测试中,我在 JSONStream + event-stream 中使用了@arcseldon 的答案。我修改了 JSONPath 查询,以便只选择我需要的内容。这一次内存从来没有超过82MB,然而,整个事情现在需要70秒才能完成!

如果您可以控制输入文件,并且它是一个对象数组,则可以更容易地解决这个问题。安排在一行中输出每条记录的文件,如下所示:

[
{"key": value},
{"key": value},
...

这仍然是有效的 JSON。

然后,使用 node.js readline 模块一次处理一行。

var fs = require("fs");


var lineReader = require('readline').createInterface({
input: fs.createReadStream("input.txt")
});


lineReader.on('line', function (line) {
line = line.trim();


if (line.charAt(line.length-1) === ',') {
line = line.substr(0, line.length-1);
}


if (line.charAt(0) === '{') {
processRecord(JSON.parse(line));
}
});


function processRecord(record) {
// Process the records one at a time here!
}

我编写了一个模块来实现这个功能,称为 BFJ。具体来说,bfj.match方法可以用来将大型流分解为 JSON 的离散块:

const bfj = require('bfj');
const fs = require('fs');


const stream = fs.createReadStream(filePath);


bfj.match(stream, (key, value, depth) => depth === 0, { ndjson: true })
.on('data', object => {
// do whatever you need to do with object
})
.on('dataError', error => {
// a syntax error was found in the JSON
})
.on('error', error => {
// some kind of operational error occurred
})
.on('end', error => {
// finished processing the stream
});

在这里,bfj.match返回一个可读的对象模式流,它将接收解析后的数据项,并传递3个参数:

  1. 包含输入 JSON 的可读流。

  2. 一个谓词,指示来自解析的 JSON 的哪些项将被推送到结果流。

  3. 一个选项对象,指示输入为以换行符分隔的 JSON (这是为了处理问题中的格式 B,格式 A 不需要它)。

在被调用时,bfj.match将从输入流的深度优先解析 JSON,用每个值调用谓词,以确定是否将该项推送到结果流。谓词传递三个参数:

  1. 属性键或数组索引(对于顶级项目,这将是 undefined)。

  2. 价值本身。

  3. JSON 结构中项的深度(顶级项为零)。

当然,根据需求,也可以根据需要使用更复杂的谓词。如果希望对属性键执行简单的匹配,还可以传递字符串或正则表达式而不是谓词函数。

https.get(url1 , function(response) {
var data = "";
response.on('data', function(chunk) {
data += chunk.toString();
})
.on('end', function() {
console.log(data)
});
});

使用@josh3736作为回答,但是对于 ES2021和 Node.js 16 + ,使用异步/等待 + AirBnb 规则:

import fs from 'node:fs';


const file = 'file.json';


/**
* @callback itemProcessorCb
* @param {object} item The current item
*/


/**
* Process each data chunk in a stream.
*
* @param {import('fs').ReadStream} readable The readable stream
* @param {itemProcessorCb} itemProcessor A function to process each item
*/
async function processChunk(readable, itemProcessor) {
let data = '';
let total = 0;


// eslint-disable-next-line no-restricted-syntax
for await (const chunk of readable) {
// join with last result, remove CR and get lines
const lines = (data + chunk).replace('\r', '').split('\n');


// clear last result
data = '';


// process lines
let line = lines.shift();
const items = [];


while (line) {
// check if isn't a empty line or an array definition
if (line !== '' && !/[\[\]]+/.test(line)) {
try {
// remove the last comma and parse json
const json = JSON.parse(line.replace(/\s?(,)+\s?$/, ''));
items.push(json);
} catch (error) {
// last line gets only a partial line from chunk
// so we add this to join at next loop
data += line;
}
}


// continue
line = lines.shift();
}


total += items.length;


// Process items in parallel
await Promise.all(items.map(itemProcessor));
}


console.log(`${total} items processed.`);
}


// Process each item
async function processItem(item) {
console.log(item);
}


// Init
try {
const readable = fs.createReadStream(file, {
flags: 'r',
encoding: 'utf-8',
});


processChunk(readable, processItem);
} catch (error) {
console.error(error.message);
}

对于 JSON 来说,比如:

[
{ "name": "A", "active": true },
{ "name": "B", "active": false },
...
]