使用 NodeJS 解析 CSV 文件

我想用 nodejs 解析。10000条记录的 csv 文件,并对每一行执行一些操作。我试过用 http://www.adaltas.com/projects/node-csv。我无法让它在每一行都停下来。这只是通读所有的10000记录。我需要做以下事情:

  1. 逐行读取 csv
  2. 对每一行进行耗时的操作
  3. 去下一行

有人能提出其他的建议吗?

399136 次浏览

似乎您需要使用一个基于流的库,比如 快速 CSV,它还包括验证支持。

您引用的 node-CSV 项目完全足以完成从以下文档转换大部分 CSV 数据的每一行的任务:

csv()
.from('82,Preisner,Zbigniew\n94,Gainsbourg,Serge')
.to(console.log)
.transform(function(row, index, callback){
process.nextTick(function(){
callback(null, row.reverse());
});
});

根据我的经验,我可以说这也是一个相当快的实现,我一直在使用它处理近10k 记录的数据集,整个数据集的处理时间在一个合理的几十毫秒的水平。

阅读 Jurka基于流的解决方案建议: node-csv IS 流基于并遵循 Node.js 的流 API。

我当前的解决方案使用异步模块串行执行:

var fs = require('fs');
var parse = require('csv-parse');
var async = require('async');


var inputFile='myfile.csv';


var parser = parse({delimiter: ','}, function (err, data) {
async.eachSeries(data, function (line, callback) {
// do something with the line
doSomething(line).then(function() {
// when processing finishes invoke the callback to move to the next one
callback();
});
})
});
fs.createReadStream(inputFile).pipe(parser);

逐行尝试 npm 插件。

npm install line-by-line --save

我用这种方式:-

var fs = require('fs');
var parse = require('csv-parse');


var csvData=[];
fs.createReadStream(req.file.path)
.pipe(parse({delimiter: ':'}))
.on('data', function(csvrow) {
console.log(csvrow);
//do something with csvrow
csvData.push(csvrow);
})
.on('end',function() {
//do something with csvData
console.log(csvData);
});

为了在 快速 CSV中暂停流,您可以执行以下操作:

let csvstream = csv.fromPath(filePath, { headers: true })
.on("data", function (row) {
csvstream.pause();
// do some heavy work
// when done resume the stream
csvstream.resume();
})
.on("end", function () {
console.log("We are done!")
})
.on("error", function (error) {
console.log(error)
});

Npm 安装 csv

CSV 文件样本 你需要一个 CSV 文件来解析,所以要么你已经有了一个,要么你可以复制下面的文本,粘贴到一个新的文件,并调用该文件“ mycsv.CSV”

ABC, 123, Fudge
532, CWE, ICECREAM
8023, POOP, DOGS
441, CHEESE, CARMEL
221, ABC, HOUSE
1
ABC, 123, Fudge
2
532, CWE, ICECREAM
3
8023, POOP, DOGS
4
441, CHEESE, CARMEL
5
221, ABC, HOUSE

读取和解析 CSV 文件的示例代码

创建一个新文件,并在其中插入以下代码。一定要仔细阅读幕后发生的事情。

    var csv = require('csv');
// loads the csv module referenced above.


var obj = csv();
// gets the csv module to access the required functionality


function MyCSV(Fone, Ftwo, Fthree) {
this.FieldOne = Fone;
this.FieldTwo = Ftwo;
this.FieldThree = Fthree;
};
// Define the MyCSV object with parameterized constructor, this will be used for storing the data read from the csv into an array of MyCSV. You will need to define each field as shown above.


var MyData = [];
// MyData array will contain the data from the CSV file and it will be sent to the clients request over HTTP.


obj.from.path('../THEPATHINYOURPROJECT/TOTHE/csv_FILE_YOU_WANT_TO_LOAD.csv').to.array(function (data) {
for (var index = 0; index < data.length; index++) {
MyData.push(new MyCSV(data[index][0], data[index][1], data[index][2]));
}
console.log(MyData);
});
//Reads the CSV file from the path you specify, and the data is stored in the array we specified using callback function.  This function iterates through an array and each line from the CSV file will be pushed as a record to another array called MyData , and logs the data into the console to ensure it worked.


var http = require('http');
//Load the http module.


var server = http.createServer(function (req, resp) {
resp.writeHead(200, { 'content-type': 'application/json' });
resp.end(JSON.stringify(MyData));
});
// Create a webserver with a request listener callback.  This will write the response header with the content type as json, and end the response by sending the MyData array in JSON format.


server.listen(8080);
// Tells the webserver to listen on port 8080(obviously this may be whatever port you want.)
1
var csv = require('csv');
2
// loads the csv module referenced above.
3
​
4
var obj = csv();
5
// gets the csv module to access the required functionality
6
​
7
function MyCSV(Fone, Ftwo, Fthree) {
8
this.FieldOne = Fone;
9
this.FieldTwo = Ftwo;
10
this.FieldThree = Fthree;
11
};
12
// Define the MyCSV object with parameterized constructor, this will be used for storing the data read from the csv into an array of MyCSV. You will need to define each field as shown above.
13
​
14
var MyData = [];
15
// MyData array will contain the data from the CSV file and it will be sent to the clients request over HTTP.
16
​
17
obj.from.path('../THEPATHINYOURPROJECT/TOTHE/csv_FILE_YOU_WANT_TO_LOAD.csv').to.array(function (data) {
18
for (var index = 0; index < data.length; index++) {
19
MyData.push(new MyCSV(data[index][0], data[index][1], data[index][2]));
20
}
21
console.log(MyData);
22
});
23
//Reads the CSV file from the path you specify, and the data is stored in the array we specified using callback function.  This function iterates through an array and each line from the CSV file will be pushed as a record to another array called MyData , and logs the data into the console to ensure it worked.
24
​
25
var http = require('http');
26
//Load the http module.
27
​
28
var server = http.createServer(function (req, resp) {
29
resp.writeHead(200, { 'content-type': 'application/json' });
30
resp.end(JSON.stringify(MyData));
31
});
32
// Create a webserver with a request listener callback.  This will write the response header with the content type as json, and end the response by sending the MyData array in JSON format.
33
​
34
server.listen(8080);
35
// Tells the webserver to listen on port 8080(obviously this may be whatever port you want.)
Things to be aware of in your app.js code
In lines 7 through 11, we define the function called 'MyCSV' and the field names.


If your CSV file has multiple columns make sure you define this correctly to match your file.


On line 17 we define the location of the CSV file of which we are loading.  Make sure you use the correct path here.

启动应用程序并验证功能 打开控制台并键入以下命令:

节点应用 1 节点应用 您应该在控制台中看到以下输出:

[  MYCSV { Fieldone: 'ABC', Fieldtwo: '123', Fieldthree: 'Fudge' },
MYCSV { Fieldone: '532', Fieldtwo: 'CWE', Fieldthree: 'ICECREAM' },
MYCSV { Fieldone: '8023', Fieldtwo: 'POOP', Fieldthree: 'DOGS' },
MYCSV { Fieldone: '441', Fieldtwo: 'CHEESE', Fieldthree: 'CARMEL' },
MYCSV { Fieldone: '221', Fieldtwo: 'ABC', Fieldthree: 'HOUSE' }, ]

1 [ MYCSV { Fieldone: ‘ ABC’,Fieldtwo: ‘123’,Fieldthree: ‘ Fudge’}, 2 MYCSV { Fieldone: “532”,Fieldtwo: “ CWE”,Fieldthree: “ ICECREAM”} , 3 MYCSV { Fieldone: “8023”,Fieldtwo: “ POOP”,Fieldthree: “ DOGS”} , 4 MYCSV { Fieldone: “441”,Fieldtwo: “ CHEESE”,Fieldthree: “ CARMEL”}, 5 MYCSV { Fieldone: “221”,Fieldtwo: “ ABC”,Fieldthree: “ HOUSE”} ,] 现在你应该打开一个网页浏览器并导航到你的服务器。您应该看到它以 JSON 格式输出数据。

结论 通过使用 node.js 和它的 CSV 模块,我们可以快速轻松地读取和使用存储在服务器上的数据,并在客户机请求时使其可用

  • 此解决方案使用 csv-parser而不是某些 以上的答案。
  • csv-parser大约在两年后出现 csv-parse.
  • 它们都解决了同样的问题,但我个人发现 csv-parser更好,因为它很容易通过它处理标头。

首先安装 csv-parser:

npm install csv-parser

假设你有一个这样的 csv 文件:

NAME, AGE
Lionel Messi, 31
Andres Iniesta, 34

您可以按以下方式执行所需的操作:

const fs = require('fs');
const csv = require('csv-parser');


fs.createReadStream(inputFilePath)
.pipe(csv())
.on('data', function(data){
try {
console.log("Name is: "+data.NAME);
console.log("Age is: "+data.AGE);


//perform the operation
}
catch(err) {
//error handler
}
})
.on('end',function(){
//some final operation
});

进一步阅读 参考

正如注释中提到的,使用 csv-parser 而不是 csv-parser 的另一个好处是:

Csv-parser 大约是27KB,而 csv-parse 是1.6 MB

我需要一个异步 csv 阅读器,最初尝试了@Pransh Tiwari 的答案,但无法让它与 awaitutil.promisify()工作。最后,我遇到了 Node-csvtojson,它与 csv-parser 的功能差不多,但是带有承诺。下面是 csvtojson 的一个实际使用例子:

const csvToJson = require('csvtojson');


const processRecipients = async () => {
const recipients = await csvToJson({
trim:true
}).fromFile('./recipients.csv');


// Code executes after recipients are fully loaded.
recipients.forEach((recipient) => {
console.log(recipient.name, recipient.email);
});
};
fs = require('fs');
fs.readFile('FILENAME WITH PATH','utf8', function(err,content){
if(err){
console.log('error occured ' +JSON.stringify(err));
}
console.log('Fileconetent are ' + JSON.stringify(content));
})

快速 CSV npm 模块可以从 csv 文件逐行读取数据。

这里有一个例子:

let csv= require('fast-csv');


var stream = fs.createReadStream("my.csv");


csv
.parseStream(stream, {headers : true})
.on("data", function(data){
console.log('I am one line of data', data);
})
.on("end", function(){
console.log("done");
});

您可以使用 csv-to-json 模块将 csv 转换为 json 格式,然后可以在程序中轻松地使用 json 文件

这是我从外部 URL 获取 csv 文件的解决方案

const parse = require( 'csv-parse/lib/sync' );
const axios = require( 'axios' );
const readCSV = ( module.exports.readCSV = async ( path ) => {
try {
const res = await axios( { url: path, method: 'GET', responseType: 'blob' } );
let records = parse( res.data, {
columns: true,
skip_empty_lines: true
} );


return records;
} catch ( e ) {
console.log( 'err' );
}


} );
readCSV('https://urltofilecsv');

我使用这个简单的方法: https://www.npmjs.com/package/csv-parser

使用非常简单:

const csv = require('csv-parser')
const fs = require('fs')
const results = [];


fs.createReadStream('./CSVs/Update 20191103C.csv')
.pipe(csv())
.on('data', (data) => results.push(data))
.on('end', () => {
console.log(results);
console.log(results[0]['Lowest Selling Price'])
});

我使用的是 csv-parse,但是对于较大的文件,它会遇到性能问题,我发现一个更好的库是 Parse 老爹,文档很好,支持很好,轻量级,没有依赖性。

安装 papaparse

npm install papaparse

用法:

  • 异步/等待
const fs = require('fs');
const Papa = require('papaparse');


const csvFilePath = 'data/test.csv'


// Function to read csv which returns a promise so you can do async / await.


const readCSV = async (filePath) => {
const csvFile = fs.readFileSync(filePath)
const csvData = csvFile.toString()
return new Promise(resolve => {
Papa.parse(csvData, {
header: true,
transformHeader: header => header.trim(),
complete: results => {
console.log('Complete', results.data.length, 'records.');
resolve(results.data);
}
});
});
};


const test = async () => {
let parsedData = await readCSV(csvFilePath);
}


test()
  • 复试
const fs = require('fs');
const Papa = require('papaparse');


const csvFilePath = 'data/test.csv'


const file = fs.createReadStream(csvFilePath);


var csvData=[];
Papa.parse(file, {
header: true,
transformHeader: header => header.trim(),
step: function(result) {
csvData.push(result.data)
},
complete: function(results, file) {
console.log('Complete', csvData.length, 'records.');
}
});

注意,header: true是配置中的一个选项,有关其他选项,请参阅文档

好吧,所以这里有很多答案,我不认为他们回答你的问题,我认为是类似于我的问题。

您需要执行类似于联系数据库或第三部分 api 的操作,这需要花费时间并且是异步的。由于体积太大或其他原因,您不想将整个文档加载到内存中,因此需要逐行读取进程。

我已经读入了 fs 文档,它可以暂停读取但使用。在(“数据”)调用将使它连续的大多数这些应答使用和造成的问题。


更新: 我知道的关于 Streams 的信息比我想要的还要多

最好的方法是创建一个可写的流。这将把 csv 数据导入到可写流中,以便管理异步调用。管道将管理缓冲区,所有的方式回到读取器,所以您不会结束与沉重的内存使用

简单版

const parser = require('csv-parser');
const stripBom = require('strip-bom-stream');
const stream = require('stream')


const mySimpleWritable = new stream.Writable({
objectMode: true, // Because input is object from csv-parser
write(chunk, encoding, done) { // Required
// chunk is object with data from a line in the csv
console.log('chunk', chunk)
done();
},
final(done) { // Optional
// last place to clean up when done
done();
}
});
fs.createReadStream(fileNameFull).pipe(stripBom()).pipe(parser()).pipe(mySimpleWritable)


课堂版本

const parser = require('csv-parser');
const stripBom = require('strip-bom-stream');
const stream = require('stream')
// Create writable class
class MyWritable extends stream.Writable {
// Used to set object mode because we get an object piped in from csv-parser
constructor(another_variable, options) {
// Calls the stream.Writable() constructor.
super({ ...options, objectMode: true });
// additional information if you want
this.another_variable = another_variable
}
// The write method
// Called over and over, for each line in the csv
async _write(chunk, encoding, done) {
// The chunk will be a line of your csv as an object
console.log('Chunk Data', this.another_variable, chunk)


// demonstrate await call
// This will pause the process until it is finished
await new Promise(resolve => setTimeout(resolve, 2000));


// Very important to add.  Keeps the pipe buffers correct.  Will load the next line of data
done();
};
// Gets called when all lines have been read
async _final(done) {
// Can do more calls here with left over information in the class
console.log('clean up')
// lets pipe know its done and the .on('final') will be called
done()
}
}


// Instantiate the new writable class
myWritable = new MyWritable(somevariable)
// Pipe the read stream to csv-parser, then to your write class
// stripBom is due to Excel saving csv files with UTF8 - BOM format
fs.createReadStream(fileNameFull).pipe(stripBom()).pipe(parser()).pipe(myWritable)


// optional
.on('finish', () => {
// will be called after the wriables internal _final
console.log('Called very last')
})

老方法:

可读性问题

const csv = require('csv-parser');
const fs = require('fs');


const processFileByLine = async(fileNameFull) => {


let reading = false


const rr = fs.createReadStream(fileNameFull)
.pipe(csv())


// Magic happens here
rr.on('readable', async function(){
// Called once when data starts flowing
console.log('starting readable')


// Found this might be called a second time for some reason
// This will stop that event from happening
if (reading) {
console.log('ignoring reading')
return
}
reading = true
    

while (null !== (data = rr.read())) {
// data variable will be an object with information from the line it read
// PROCESS DATA HERE
console.log('new line of data', data)
}


// All lines have been read and file is done.
// End event will be called about now so that code will run before below code


console.log('Finished readable')
})




rr.on("end", function () {
// File has finished being read
console.log('closing file')
});


rr.on("error", err => {
// Some basic error handling for fs error events
console.log('error', err);
});
}

您将注意到一个 reading标志。我已经注意到,由于某种原因正好在文件的末尾。On (‘ readable’)在小文件和大文件上第二次调用。我不知道为什么,但这阻止从第二个进程阅读相同的行项目。

var fs = require("fs");
// READ CSV INTO STRING
var data = fs.readFileSync("your.csv").toLocaleString();


// STRING TO ARRAY
var rows = data.split("\n"); // SPLIT ROWS
rows.forEach((row) => {
columns = row.split(","); //SPLIT COLUMNS
console.log(columns);
})

我用承诺的方法做到了这一点

const fs = require('fs')
const {parse} = require('csv-parse')
function readFile(path){
return new Promise((resolve,reject)=>{
fs.readFile(path, function (err, fileData) {
parse(fileData, {columns: false, trim: true}, async function(err, rows) {
if(err){
reject(err)
}
resolve(rows)
})
})
})
}

Csv-parse 目前支持 异步迭代器,它应该非常适合您的用例