使用 node.js 流处理错误

用流处理错误的正确方法是什么?我已经知道有一个“错误”事件,你可以监听,但我想知道一些任意复杂的情况下更多的细节。

对于初学者,当你想做一个简单的管道链时,你会怎么做:

input.pipe(transformA).pipe(transformB).pipe(transformC)...

如何正确地创建其中一个转换以便正确地处理错误?

更多相关问题:

  • 当一个错误发生时,‘ end’事件会发生什么?它从来没被开过火吗?有时会被炒鱿鱼吗?它是否依赖于转换/流?这里的标准是什么?
  • 是否存在通过管道传播错误的机制?
  • 域是否有效地解决了这个问题。
  • 从“错误”事件中产生的错误是否有堆栈跟踪?有时候?从来没有?有办法从他们那里拿到吗?
112760 次浏览

变形

变换流既可读又可写,因此是非常好的“中间”流。由于这个原因,它们有时被称为 through流。它们在这方面类似于双工流,只不过它们提供了一个很好的接口来操作数据,而不仅仅是发送数据。转换流的目的是在数据通过流传输时对其进行操作。例如,您可能希望执行一些异步调用,或者派生几个字段,重新映射一些内容,等等。


Where you might put a transform stream


有关如何创建转换流,请参阅 给你给你。所有您需要做的是:

  1. 包括流模块
  2. 实例化(或继承) Transform 类
  3. 实现一个采用 (chunk, encoding, callback)_transform方法。

数据块就是你的数据。如果您使用的是 objectMode = true,大多数情况下不需要担心编码问题。当您处理完数据块时,将调用回调函数。然后将这个块推送到下一个流。

如果你想要一个很好的助手模块,使你能够通过流真正容易地做,我建议 通过2

对于错误处理,请继续读取。

烟斗

在管道链中,处理错误确实非常重要。根据 这根线。管道()不是为转发错误而构建的。比如..。

var a = createStream();
a.pipe(b).pipe(c).on('error', function(e){handleError(e)});

只会监听 c流中的错误。如果在 a上发出一个错误事件,那么这个错误事件将不会传递下去,实际上会抛出。要正确地做到这一点:

var a = createStream();
a.on('error', function(e){handleError(e)})
.pipe(b)
.on('error', function(e){handleError(e)})
.pipe(c)
.on('error', function(e){handleError(e)});

现在,尽管第二种方法更加冗长,但是您至少可以保留错误发生的上下文。这通常是件好事。

但是,如果您只想捕获目的地的错误,并且不太关心错误发生的位置,那么有一个库非常有用,那就是 事件流

结束

当触发错误事件时,将不会触发结束事件(显式地)。错误事件的发出将结束流。

域名

根据我的经验,域在大多数情况下都能很好地工作。如果您有一个未处理的错误事件(即在没有侦听器的流上发出错误) ,服务器可能会崩溃。现在,正如上面的文章所指出的,您可以将流包装在一个域中,该域应该能够正确地捕获所有错误。

var d = domain.create();
d.on('error', handleAllErrors);
d.run(function() {
fs.createReadStream(tarball)
.pipe(gzip.Gunzip())
.pipe(tar.Extract({ path: targetPath }))
.on('close', cb);
});

域的美妙之处在于它们将保留堆栈跟踪。尽管事件流在这方面也做得很好。

如需进一步阅读,请参阅 溪流手册。相当深入,但超级有用,并提供了一些伟大的链接到许多有用的模块。

域名已被弃用。你不需要它们。

对于这个问题,转换和可写之间的区别并不那么重要。

Mshell _ lauren 的答案很棒,但是作为一种替代方法,您还可以显式地监听每个流上您认为可能出错的错误事件。如果愿意,可以重用处理程序函数。

var a = createReadableStream()
var b = anotherTypeOfStream()
var c = createWriteStream()


a.on('error', handler)
b.on('error', handler)
c.on('error', handler)


a.pipe(b).pipe(c)


function handler (err) { console.log(err) }

这样做可以防止当其中一个流触发错误事件时发生声名狼藉的未捕获异常

整个链中的错误可以通过一个简单的函数传播到最右边的流中:

function safePipe (readable, transforms) {
while (transforms.length > 0) {
var new_readable = transforms.shift();
readable.on("error", function(e) { new_readable.emit("error", e); });
readable.pipe(new_readable);
readable = new_readable;
}
return readable;
}

可以这样使用:

safePipe(readable, [ transform1, transform2, ... ]);

.on("error", handler)只处理流错误,但是如果您使用自定义转换流,.on("error", handler)不捕捉发生在 _transform函数中的错误。因此,可以这样做来控制应用程序流:-

_transform函数中的 this关键字是指 Stream本身,即 EventEmitter。因此,您可以使用如下的 try catch来捕获错误,然后将它们传递给自定义事件处理程序。

// CustomTransform.js
CustomTransformStream.prototype._transform = function (data, enc, done) {
var stream = this
try {
// Do your transform code
} catch (e) {
// Now based on the error type, with an if or switch statement
stream.emit("CTError1", e)
stream.emit("CTError2", e)
}
done()
}


// StreamImplementation.js
someReadStream
.pipe(CustomTransformStream)
.on("CTError1", function (e) { console.log(e) })
.on("CTError2", function (e) { /*Lets do something else*/ })
.pipe(someWriteStream)

这样,您可以将逻辑和错误处理程序分开。此外,您可以选择只处理一些错误,而忽略其他错误。

< p > 更新 < br > 替代方案: RXJS Obable

使用 Node.js 模式,创建一个 Transform 流机制并使用一个参数调用它的回调 done来传播错误:

var transformStream1 = new stream.Transform(/*{objectMode: true}*/);


transformStream1.prototype._transform = function (chunk, encoding, done) {
//var stream = this;


try {
// Do your transform code
/* ... */
} catch (error) {
// nodejs style for propagating an error
return done(error);
}


// Here, everything went well
done();
}


// Let's use the transform stream, assuming `someReadStream`
// and `someWriteStream` have been defined before
someReadStream
.pipe(transformStream1)
.on('error', function (error) {
console.error('Error in transformStream1:');
console.error(error);
process.exit(-1);
})
.pipe(someWriteStream)
.on('close', function () {
console.log('OK.');
process.exit();
})
.on('error', function (error) {
console.error(error);
process.exit(-1);
});

如果使用 node > = v10.0.0,则可以使用 流,管道流,完成了

例如:

const { pipeline, finished } = require('stream');


pipeline(
input,
transformA,
transformB,
transformC,
(err) => {
if (err) {
console.error('Pipeline failed', err);
} else {
console.log('Pipeline succeeded');
}
});




finished(input, (err) => {
if (err) {
console.error('Stream failed', err);
} else {
console.log('Stream is done reading');
}
});

有关更多讨论,请参见此 Github 公关公司

Trycatch 无法捕获流中发生的错误,因为这些错误是在调用代码已退出后引发的。你可参考以下文件:

https://nodejs.org/dist/latest-v10.x/docs/api/errors.html

使用 多管道软件包将多个流合并为一个双工流,并在一个地方处理错误。

const pipe = require('multipipe')


// pipe streams
const stream = pipe(streamA, streamB, streamC)




// centralized error handling
stream.on('error', fn)
const http = require('http');
const fs = require('fs');
const server = http.createServer();


server.on('request',(req,res)=>{
const readableStream = fs.createReadStream(__dirname+'/README.md');
const writeableStream = fs.createWriteStream(__dirname+'/assets/test.txt');
readableStream
.on('error',()=>{
res.end("File not found")
})
.pipe(writeableStream)
.on('error',(error)=>{
console.log(error)
res.end("Something went to wrong!")
})
.on('finish',()=>{
res.end("Done!")
})
})


server.listen(8000,()=>{
console.log("Server is running in 8000 port")
})