Js 将相同的可读流管道化为多个(可写的)目标

我需要连续运行两个命令,这两个命令需要从同一个流中读取数据。 当一个数据流通过管道传输到另一个数据流之后,缓冲区就会被清空,所以我不能再从那个数据流中读取数据,所以这个方法不起作用:

var spawn = require('child_process').spawn;
var fs = require('fs');
var request = require('request');


var inputStream = request('http://placehold.it/640x360');
var identify = spawn('identify',['-']);


inputStream.pipe(identify.stdin);


var chunks = [];
identify.stdout.on('data',function(chunk) {
chunks.push(chunk);
});


identify.stdout.on('end',function() {
var size = getSize(Buffer.concat(chunks)); //width
var convert = spawn('convert',['-','-scale',size * 0.5,'png:-']);
inputStream.pipe(convert.stdin);
convert.stdout.pipe(fs.createWriteStream('half.png'));
});


function getSize(buffer){
return parseInt(buffer.toString().split(' ')[2].split('x')[0]);
}

Request 对此表示不满

Error: You cannot pipe after data has been emitted from the response.

当然,将 InputStream改为 fs.createWriteStream也会产生同样的问题。 我不想写入一个文件,但 重复使用在某种程度上的流,请求生成(或任何其他为此事)。

有没有一种方法可以在可读流完成管道后重用它? 完成上面这个例子的最佳方法是什么?

70923 次浏览

You have to create duplicate of the stream by piping it to two streams. You can create a simple stream with a PassThrough stream, it simply passes the input to the output.

const spawn = require('child_process').spawn;
const PassThrough = require('stream').PassThrough;


const a = spawn('echo', ['hi user']);
const b = new PassThrough();
const c = new PassThrough();


a.stdout.pipe(b);
a.stdout.pipe(c);


let count = 0;
b.on('data', function (chunk) {
count += chunk.length;
});
b.on('end', function () {
console.log(count);
c.pipe(process.stdout);
});

Output:

8
hi user

For general problem, the following code works fine

var PassThrough = require('stream').PassThrough
a=PassThrough()
b1=PassThrough()
b2=PassThrough()
a.pipe(b1)
a.pipe(b2)
b1.on('data', function(data) {
console.log('b1:', data.toString())
})
b2.on('data', function(data) {
console.log('b2:', data.toString())
})
a.write('text')

The first answer only works if streams take roughly the same amount of time to process data. If one takes significantly longer, the faster one will request new data, consequently overwriting the data still being used by the slower one (I had this problem after trying to solve it using a duplicate stream).

The following pattern worked very well for me. It uses a library based on Stream2 streams, Streamz, and Promises to synchronize async streams via a callback. Using the familiar example from the first answer:

spawn = require('child_process').spawn;
pass = require('stream').PassThrough;
streamz = require('streamz').PassThrough;
var Promise = require('bluebird');


a = spawn('echo', ['hi user']);
b = new pass;
c = new pass;


a.stdout.pipe(streamz(combineStreamOperations));


function combineStreamOperations(data, next){
Promise.join(b, c, function(b, c){ //perform n operations on the same data
next(); //request more
}


count = 0;
b.on('data', function(chunk) { count += chunk.length; });
b.on('end', function() { console.log(count); c.pipe(process.stdout); });

What about piping into two or more streams not at the same time ?

For example :

var PassThrough = require('stream').PassThrough;
var mybiraryStream = stream.start(); //never ending audio stream
var file1 = fs.createWriteStream('file1.wav',{encoding:'binary'})
var file2 = fs.createWriteStream('file2.wav',{encoding:'binary'})
var mypass = PassThrough
mybinaryStream.pipe(mypass)
mypass.pipe(file1)
setTimeout(function(){
mypass.pipe(file2);
},2000)

The above code does not produce any errors but the file2 is empty

I have a different solution to write to two streams simultaneously, naturally, the time to write will be the addition of the two times, but I use it to respond to a download request, where I want to keep a copy of the downloaded file on my server (actually I use a S3 backup, so I cache the most used files locally to avoid multiple file transfers)

/**
* A utility class made to write to a file while answering a file download request
*/
class TwoOutputStreams {
constructor(streamOne, streamTwo) {
this.streamOne = streamOne
this.streamTwo = streamTwo
}


setHeader(header, value) {
if (this.streamOne.setHeader)
this.streamOne.setHeader(header, value)
if (this.streamTwo.setHeader)
this.streamTwo.setHeader(header, value)
}


write(chunk) {
this.streamOne.write(chunk)
this.streamTwo.write(chunk)
}


end() {
this.streamOne.end()
this.streamTwo.end()
}
}

You can then use this as a regular OutputStream

const twoStreamsOut = new TwoOutputStreams(fileOut, responseStream)

and pass it to to your method as if it was a response or a fileOutputStream

If you have async operations on the PassThrough streams, the answers posted here won't work. A solution that works for async operations includes buffering the stream content and then creating streams from the buffered result.

  1. To buffer the result you can use concat-stream

    const Promise = require('bluebird');
    const concat = require('concat-stream');
    const getBuffer = function(stream){
    return new Promise(function(resolve, reject){
    var gotBuffer = function(buffer){
    resolve(buffer);
    }
    var concatStream = concat(gotBuffer);
    stream.on('error', reject);
    stream.pipe(concatStream);
    });
    }
    
  2. To create streams from the buffer you can use:

    const { Readable } = require('stream');
    const getBufferStream = function(buffer){
    const stream = new Readable();
    stream.push(buffer);
    stream.push(null);
    return Promise.resolve(stream);
    }
    

You can use this small npm package I created:

readable-stream-clone

With this you can reuse readable streams as many times as you need