如何从Node.Js中的字符串创建流?

我正在使用一个库ya-csv,它期望文件或流作为输入,但我有一个字符串。

如何将该字符串转换为节点中的流?

204035 次浏览

JavaScript是鸭子类型的,所以如果你只是复制可读流的API,它会工作得很好。事实上,你可能不能实现这些方法中的大多数,或者只是把它们作为存根;您需要实现的只是库使用的内容。你也可以使用Node预先构建的EventEmitter来处理事件,所以你不必自己实现addListener之类的。

下面是如何在CoffeeScript中实现它:

class StringStream extends require('events').EventEmitter
constructor: (@string) -> super()


readable: true
writable: false


setEncoding: -> throw 'not implemented'
pause: ->    # nothing to do
resume: ->   # nothing to do
destroy: ->  # nothing to do
pipe: -> throw 'not implemented'


send: ->
@emit 'data', @string
@emit 'end'

然后你可以这样使用它:

stream = new StringStream someString
doSomethingWith stream
stream.send()

只需要创建一个stream模块的新实例,并根据你的需要定制它:

var Stream = require('stream');
var stream = new Stream();


stream.pipe = function(dest) {
dest.write('your string');
return dest;
};


stream.pipe(process.stdout); // in this case the terminal, change to ya-csv

var Stream = require('stream');
var stream = new Stream();


stream.on('data', function(data) {
process.stdout.write(data); // change process.stdout to ya-csv
});


stream.emit('data', 'this is my string');

编辑: 中庭的回答可能更好。

我以前的答案文本保存在下面。


要将字符串转换为流,可以使用暂停的通过流:

through().pause().queue('your string').end()

例子:

var through = require('through')


// Create a paused stream and buffer some data into it:
var stream = through().pause().queue('your string').end()


// Pass stream around:
callback(null, stream)


// Now that a consumer has attached, remember to resume the stream:
stream.resume()

正如@substack#节点中纠正了我,节点v10中的新流API使这更容易:

const Readable = require('stream').Readable;
const s = new Readable();
s._read = () => {}; // redundant? see update below
s.push('your text here');
s.push(null);

之后,你可以自由地它或以其他方式将它传递给你的预期消费者。

它不像概括一行程序那样简洁,但它确实避免了额外的依赖。

(到目前为止,v0.10.26到v9.2.1版本中的更新:,如果你没有设置_read,直接从REPL提示符调用push将会崩溃并出现not implemented异常。它不会在函数或脚本中崩溃。如果不一致让你紧张,包括noop。)

在coffee-script:

class StringStream extends Readable
constructor: (@str) ->
super()


_read: (size) ->
@push @str
@push null

使用它:

new StringStream('text here').pipe(stream1).pipe(stream2)

不要用Jo Liss的简历回答。这在大多数情况下都是可行的,但在我的案例中,它却让我花了4或5个小时去寻找漏洞。不需要第三方模块来做这件事。

新回答:

var Readable = require('stream').Readable


var s = new Readable()
s.push('beep')    // the string you want
s.push(null)      // indicates end-of-file basically - the end of the stream

这应该是一个完全兼容的可读流。在这里看到的获取关于如何正确使用流的更多信息。

< p > 旧的答案: 只需使用本地PassThrough流:

var stream = require("stream")
var a = new stream.PassThrough()
a.write("your string")
a.end()


a.pipe(process.stdout) // piping will work as normal
/*stream.on('data', function(x) {
// using the 'data' event works too
console.log('data '+x)
})*/
/*setTimeout(function() {
// you can even pipe after the scheduler has had time to do other things
a.pipe(process.stdout)
},100)*/


a.on('end', function() {
console.log('ended') // the end event will be called properly
})

注意,'close'事件不会被触发(流接口不需要)。

有一个模块用于此:https://www.npmjs.com/package/string-to-stream

var str = require('string-to-stream')
str('hi there').pipe(process.stdout) // => 'hi there'

我厌倦了每六个月重新学习一次,所以我发布了一个npm模块来抽象实现细节:

https://www.npmjs.com/package/streamify-string

这是该模块的核心:

const Readable = require('stream').Readable;
const util     = require('util');


function Streamify(str, options) {


if (! (this instanceof Streamify)) {
return new Streamify(str, options);
}


Readable.call(this, options);
this.str = str;
}


util.inherits(Streamify, Readable);


Streamify.prototype._read = function (size) {


var chunk = this.str.slice(0, size);


if (chunk) {
this.str = this.str.slice(size);
this.push(chunk);
}


else {
this.push(null);
}


};


module.exports = Streamify;

str是在调用时必须传递给构造函数的string,并将由流作为数据输出。options是根据的文档可以传递给流的典型选项。

根据Travis CI,它应该与大多数版本的节点兼容。

另一个解决方案是将read函数传递给Readable的构造函数(cf doc 流可读选项)

var s = new Readable({read(size) {
this.push("your string here")
this.push(null)
}});

例如,你可以使用s.pipe

下面是TypeScript中的一个简洁的解决方案:

import { Readable } from 'stream'


class ReadableString extends Readable {
private sent = false


constructor(
private str: string
) {
super();
}


_read() {
if (!this.sent) {
this.push(Buffer.from(this.str));
this.sent = true
}
else {
this.push(null)
}
}
}


const stringStream = new ReadableString('string to be streamed...')

从节点10.17,流。Readable有from方法可以轻松地从任何可迭代对象(包括数组字面值)创建流:

const { Readable } = require("stream")


const readable = Readable.from(["input string"])


readable.on("data", (chunk) => {
console.log(chunk) // will be called once with `"input string"`
})

请注意,至少在10.17和12.3之间,字符串本身是一个可迭代对象,因此Readable.from("input string")可以工作,但每个字符会触发一个事件。Readable.from(["input string"])将为数组中的每一项触发一个事件(在本例中为一项)。

还要注意,在以后的节点中(可能是12.3,因为文档说函数在那时已经改变了),不再需要将字符串包装到数组中。

https://nodejs.org/api/stream.html#stream_stream_readable_from_iterable_options

在NodeJS中,你可以通过以下几种方式创建一个可读流:

解决方案1

你可以用fs模块来做。函数fs.createReadStream()允许你打开一个可读的流,你所要做的就是传递文件的路径来开始流。

const fs = require('fs');


const readable_stream = fs.createReadStream('file_path');

解决方案2

如果您不想创建文件,您可以创建一个内存流并对其进行处理(例如,将其上传到某个地方)。你可以用stream模块来做到这一点。你可以从stream模块导入Readable,并创建一个可读流。创建对象时,还可以实现read()方法,用于从内部缓冲区读取数据。如果没有可读取的数据,则返回null。可选参数size指定要读取的特定字节数。如果未指定size参数,则将返回内部缓冲区中包含的所有数据。

const Readable = require('stream').Readable;


const readable_stream = new Readable({
​read(size) {
​// ...
​  }
});

解决方案3

当你通过网络获取一些东西时,它可以像流一样被获取(例如,你从一些API获取一个PDF文档)。

const axios = require('axios');


const readable_stream = await axios({
method: 'get',
url: "pdf_resource_url",
responseType: 'stream'
}).data;

解决方案4

第三方包可以支持创建流作为一个特性。这是aws-sdk包的一种方式,通常用于将文件上传到S3

const file = await s3.getObject(params).createReadStream();