如何将Node.js流的内容读入字符串变量?

我正在入侵一个Node程序,该程序使用smtp-protocol来捕获SMTP电子邮件并对邮件数据采取行动。库将邮件数据作为流提供,但我不知道如何将其转换为字符串。

我目前正在用stream.pipe(process.stdout, { end: false })将它写入标准输出,但正如我所说,我需要流数据在字符串中,一旦流结束,我就可以使用它。

我如何收集所有的数据从一个Node.js流到一个字符串?

215911 次浏览

(这个答案是多年前的,当时它是最好的答案。下面有一个更好的答案。我没有跟上node.js,我不能删除这个答案,因为它被标记为“正确的这个问题”。如果你想按下,你想让我做什么?)

关键是使用可读的流dataend事件。听下面这些事件:

stream.on('data', (chunk) => { ... });
stream.on('end', () => { ... });

当你收到data事件时,将新的数据块添加到为收集数据而创建的Buffer中。

当你收到end事件时,如果必要的话,将完成的Buffer转换为一个字符串。那就做你该做的事。

希望这比上面的答案更有用:

var string = '';
stream.on('data',function(data){
string += data.toString();
console.log('stream data ' + part);
});


stream.on('end',function(){
console.log('final output ' + string);
});

请注意,字符串连接并不是收集字符串部分的最有效方法,但使用它是为了简单(也许您的代码并不关心效率)。

此外,对于非ascii文本,这段代码可能会产生不可预测的失败(它假设每个字符都适合一个字节),但也许您也不关心这一点。

在nodejs 文档中,你应该这样做——总是记住一个字符串,而不知道编码只是一堆字节:

var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
assert.equal(typeof chunk, 'string');
console.log('got %d characters of string data', chunk.length);
})

我通常使用这个简单的函数将流转换为字符串:

function streamToString(stream, cb) {
const chunks = [];
stream.on('data', (chunk) => {
chunks.push(chunk.toString());
});
stream.on('end', () => {
cb(chunks.join(''));
});
}

使用的例子:

let stream = fs.createReadStream('./myFile.foo');
streamToString(stream, (data) => {
console.log(data);  // data is now my string variable
});

以上这些方法对我都没用。我需要使用Buffer对象:

  const chunks = [];


readStream.on("data", function (chunk) {
chunks.push(chunk);
});


// Send the buffer or you can put it into a var
readStream.on("end", function () {
res.send(Buffer.concat(chunks));
});

流没有简单的.toString()函数(我理解),也没有类似.toStringAsync(cb)函数(我不理解)的东西。

所以我创建了自己的helper函数:

var streamToString = function(stream, callback) {
var str = '';
stream.on('data', function(chunk) {
str += chunk;
});
stream.on('end', function() {
callback(str);
});
}


// how to use:
streamToString(myStream, function(myStr) {
console.log(myStr);
});

这为我工作,是基于Node v6.7.0文档:

let output = '';
stream.on('readable', function() {
let read = stream.read();
if (read !== null) {
// New stream data is available
output += read.toString();
} else {
// Stream is now finished when read is null.
// You can callback here e.g.:
callback(null, output);
}
});


stream.on('error', function(err) {
callback(err, null);
})

像减流器这样的东西怎么样?

下面是一个使用ES6类的例子如何使用一个。

var stream = require('stream')


class StreamReducer extends stream.Writable {
constructor(chunkReducer, initialvalue, cb) {
super();
this.reducer = chunkReducer;
this.accumulator = initialvalue;
this.cb = cb;
}
_write(chunk, enc, next) {
this.accumulator = this.reducer(this.accumulator, chunk);
next();
}
end() {
this.cb(null, this.accumulator)
}
}


// just a test stream
class EmitterStream extends stream.Readable {
constructor(chunks) {
super();
this.chunks = chunks;
}
_read() {
this.chunks.forEach(function (chunk) {
this.push(chunk);
}.bind(this));
this.push(null);
}
}


// just transform the strings into buffer as we would get from fs stream or http request stream
(new EmitterStream(
["hello ", "world !"]
.map(function(str) {
return Buffer.from(str, 'utf8');
})
)).pipe(new StreamReducer(
function (acc, v) {
acc.push(v);
return acc;
},
[],
function(err, chunks) {
console.log(Buffer.concat(chunks).toString('utf8'));
})
);

最简洁的解决方案可能是使用“string-stream”包,它将流转换为带有承诺的字符串。

const streamString = require('stream-string')


streamString(myStream).then(string_variable => {
// myStream was converted to a string, and that string is stored in string_variable
console.log(string_variable)


}).catch(err => {
// myStream emitted an error event (err), so the promise from stream-string was rejected
throw err
})

另一种方法是将流转换为承诺(参考下面的例子),并使用then(或await)将解析值分配给变量。

function streamToString (stream) {
const chunks = [];
return new Promise((resolve, reject) => {
stream.on('data', (chunk) => chunks.push(Buffer.from(chunk)));
stream.on('error', (err) => reject(err));
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
})
}


const result = await streamToString(stream)

我更幸运的是这样使用:

let string = '';
readstream
.on('data', (buf) => string += buf.toString())
.on('end', () => console.log(string));

我使用节点v9.11.1readstream是来自http.get回调的响应。

使用相当流行的stream-buffers,你可能已经在你的项目依赖项中,这是非常简单的:

// imports
const { WritableStreamBuffer } = require('stream-buffers');
const { promisify } = require('util');
const { createReadStream } = require('fs');
const pipeline = promisify(require('stream').pipeline);


// sample stream
let stream = createReadStream('/etc/hosts');


// pipeline the stream into a buffer, and print the contents when done
let buf = new WritableStreamBuffer();
pipeline(stream, buf).then(() => console.log(buf.getContents().toString()));

使用流行(每周下载超过500万次)和轻量级get-stream库的简单方法:

https://www.npmjs.com/package/get-stream

const fs = require('fs');
const getStream = require('get-stream');


(async () => {
const stream = fs.createReadStream('unicorn.txt');
console.log(await getStream(stream)); //output is string
})();

,(被use utf8);

塞巴斯蒂安做得好。

我有“缓冲区问题”的几行测试代码,并添加编码信息,它解决了它,见下文。

说明问题

软件

// process.stdin.setEncoding('utf8');
process.stdin.on('data', (data) => {
console.log(typeof(data), data);
});

输入

hello world

输出

object <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64 0d 0a>

演示解决方案

软件

process.stdin.setEncoding('utf8'); // <- Activate!
process.stdin.on('data', (data) => {
console.log(typeof(data), data);
});

输入

hello world

输出

string hello world

还有一个是使用承诺的字符串:

function getStream(stream) {
return new Promise(resolve => {
const chunks = [];


# Buffer.from is required if chunk is a String, see comments
stream.on("data", chunk => chunks.push(Buffer.from(chunk)));
stream.on("end", () => resolve(Buffer.concat(chunks).toString()));
});
}


用法:

const stream = fs.createReadStream(__filename);
getStream(stream).then(r=>console.log(r));

如果需要,删除.toString()以用于二进制数据。

更新: @AndreiLED正确地指出了字符串有问题。我无法获得一个流,返回我拥有的节点版本的字符串,但api指出这是可能的。

在我的例子中,内容类型响应头是内容类型:文本/平原。所以,我已经从Buffer读取了数据:

let data = [];
stream.on('data', (chunk) => {
console.log(Buffer.from(chunk).toString())
data.push(Buffer.from(chunk).toString())
});
所有列出的答案似乎都以流动模式打开可读流,这不是NodeJS的默认模式,并且可能有局限性,因为它缺乏NodeJS在暂停可读流模式中提供的反压力支持。 这里是一个使用Just Buffers、本机流和本机流转换的实现,并支持对象模式

import {Transform} from 'stream';


let buffer =null;


function objectifyStream() {
return new Transform({
objectMode: true,
transform: function(chunk, encoding, next) {


if (!buffer) {
buffer = Buffer.from([...chunk]);
} else {
buffer = Buffer.from([...buffer, ...chunk]);
}
next(null, buffer);
}
});
}


process.stdin.pipe(objectifyStream()).process.stdout

你觉得这个怎么样?

async function streamToString(stream) {
// lets have a ReadableStream as a stream variable
const chunks = [];


for await (const chunk of stream) {
chunks.push(Buffer.from(chunk));
}


return Buffer.concat(chunks).toString("utf-8");
}