将一个流管道连接到 s3.load()

我目前正在使用一个名为 S3-上载-流的 node.js 插件将非常大的文件流传输到 Amazon S3。它使用多部分 API,并且在大多数情况下工作得非常好。

但是,这个模块显示了它的年代,我已经对它进行了修改(作者也不赞成使用它)。今天我遇到了 Amazon 的另一个问题,我真的很想接受作者的建议,开始使用官方的 aws-sdk 来完成我的上传。

但是。

官方的 SDK 似乎不支持到 s3.upload()的管道。上传的本质是必须将可读流作为参数传递给 S3构造函数。

我有大约120多个用户代码模块,它们执行各种文件处理,并且它们不知道输出的最终目的地。引擎给他们一个可管道写的输出流,然后他们通过管道输出。我不能给他们一个 AWS.S3对象,并要求他们调用它的 upload()没有添加代码的所有模块。我之所以使用 s3-upload-stream是因为它支持管道。

有没有办法让 aws-sdk s3.upload()的东西,我可以管道流?

135665 次浏览

如果你知道流的大小,你可以使用 Minio-J来上传这样的流:

  s3Client.putObject('my-bucketname', 'my-objectname.ogg', stream, size, 'audio/ogg', function(e) {
if (e) {
return console.log(e)
}
console.log("Successfully uploaded the stream")
})

用 node.js stream.PassThrough()流包装 S3 upload()函数。

这里有一个例子:

inputStream
.pipe(uploadFromStream(s3));


function uploadFromStream(s3) {
var pass = new stream.PassThrough();


var params = {Bucket: BUCKET, Key: KEY, Body: pass};
s3.upload(params, function(err, data) {
console.log(err, data);
});


return pass;
}

如果这对任何人有帮助的话,我可以成功地从客户端流到 s3:

Https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a

服务器端代码假定 req是一个流对象,在我的例子中,它是从客户端发送的,在头中设置了文件信息。

const fileUploadStream = (req, res) => {
//get "body" args from header
const { id, fn } = JSON.parse(req.get('body'));
const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn
const params = {
Key,
Bucket: bucketName, //set somewhere
Body: req, //req is a stream
};
s3.upload(params, (err, data) => {
if (err) {
res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack));
} else {
res.send(Key);
}
});
};

是的,它打破了惯例,但是如果你看一下要点,它比我用 Multer,餐馆服务员等发现的任何东西都要干净。.

实用主义 + 1,感谢@SalehenRahman 的帮助。

在已接受的答案中,函数在上传完成之前结束,因此,它是不正确的。下面的代码正确地从一个可读的流导出。

上传参考

async function uploadReadableStream(stream) {
const params = {Bucket: bucket, Key: key, Body: stream};
return s3.upload(params).promise();
}


async function upload() {
const readable = getSomeReadableStream();
const results = await uploadReadableStream(readable);
console.log('upload complete', results);
}

您还可以进一步使用 ManagedUpload输出进度信息:

const manager = s3.upload(params);
manager.on('httpUploadProgress', (progress) => {
console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});

ManagedUpload 引用

可用事件列表

回答晚了点,也许能帮到别人。您可以同时返回可写流和承诺,这样您就可以在上传结束时获得响应数据。

const AWS = require('aws-sdk');
const stream = require('stream');


const uploadStream = ({ Bucket, Key }) => {
const s3 = new AWS.S3();
const pass = new stream.PassThrough();
return {
writeStream: pass,
promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
};
}

你可以使用如下函数:

const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'});
const readStream = fs.createReadStream('/path/to/yourfile.mp4');


const pipeline = readStream.pipe(writeStream);

现在你可以检查承诺:

promise.then(() => {
console.log('upload completed successfully');
}).catch((err) => {
console.log('upload failed.', err.message);
});

或使用异步/等待:

try {
await promise;
console.log('upload completed successfully');
} catch (error) {
console.log('upload failed.', error.message);
}

或者当 stream.pipe()返回流时。可写的目标(上面的 writeStream 变量) ,允许管道链,我们也可以使用它的事件:

 pipeline.on('close', () => {
console.log('upload successful');
});
pipeline.on('error', (err) => {
console.log('upload failed', err.message)
});

打字脚本解决方案:
这个例子使用:

import * as AWS from "aws-sdk";
import * as fsExtra from "fs-extra";
import * as zlib from "zlib";
import * as stream from "stream";

异步函数:

public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> {


const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => {
const passT = new stream.PassThrough();
return {
writeStream: passT,
promise: S3.upload({ Bucket, Key, Body: passT }).promise(),
};
};
const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key);
fsExtra.createReadStream(filePath).pipe(writeStream);     //  NOTE: Addition You can compress to zip by  .pipe(zlib.createGzip()).pipe(writeStream)
let output = true;
await promise.catch((reason)=> { output = false; console.log(reason);});
return output;
}

将此方法称为:

let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);

对于那些抱怨当他们使用 s3 api 上传函数和一个零字节文件结束在 s3(@Radar155和@gabo)-我也有这个问题。

创建第二个 PassThrough 流,将所有数据从第一个流传输到第二个流,并将对第二个流的引用传递给 s3。您可以通过几种不同的方式来实现这一点——可能一种肮脏的方式是侦听第一个流上的“ data”事件,然后将相同的数据写入第二个流——类似于“ end”事件——只需调用第二个流上的 end 函数。我不知道这是 aws api 中的 bug、节点的版本还是其他什么问题——但它解决了我的问题。

看起来可能是这样的:

var PassThroughStream = require('stream').PassThrough;
var srcStream = new PassThroughStream();


var rstream = fs.createReadStream('Learning/stocktest.json');
var sameStream = rstream.pipe(srcStream);
// interesting note: (srcStream == sameStream) at this point
var destStream = new PassThroughStream();
// call your s3.upload function here - passing in the destStream as the Body parameter
srcStream.on('data', function (chunk) {
destStream.write(chunk);
});


srcStream.on('end', function () {
dataStream.end();
});

我正在使用 KnexJS,在使用它们的流 API 时遇到了问题。我终于把它修好了,希望下面的内容能帮到别人。

const knexStream = knex.select('*').from('my_table').stream();
const passThroughStream = new stream.PassThrough();


knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n'));
knexStream.on('end', () => passThroughStream.end());


const uploadResult = await s3
.upload({
Bucket: 'my-bucket',
Key: 'stream-test.txt',
Body: passThroughStream
})
.promise();

没有一个答案对我有用,因为我想:

  • 接入 s3.upload()
  • s3.upload()的结果导入另一个流

接受的答案不会做后者。其他的依赖于承诺 api,这在处理流管道时很麻烦。

这是我对公认答案的修改。

const s3 = new S3();


function writeToS3({Key, Bucket}) {
const Body = new stream.PassThrough();


s3.upload({
Body,
Key,
Bucket: process.env.adpBucket
})
.on('httpUploadProgress', progress => {
console.log('progress', progress);
})
.send((err, data) => {
if (err) {
Body.destroy(err);
} else {
console.log(`File uploaded and available at ${data.Location}`);
Body.destroy();
}
});


return Body;
}


const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});


pipeline.on('close', () => {
// upload finished, do something else
})
pipeline.on('error', () => {
// upload wasn't successful. Handle it
})

在上述最为普遍接受的答案中需要注意的是: 如果使用管道,则需要返回函数中的传递,

fs.createReadStream(<filePath>).pipe(anyUploadFunction())

function anyUploadFunction () {
let pass = new stream.PassThrough();
return pass // <- Returning this pass is important for the stream to understand where it needs to write to.
}

否则,它将悄悄地移动到下一个,而不抛出一个错误,或者将抛出一个 TypeError: dest.on is not a function错误,具体取决于您如何编写函数

遵循其他答案,并使用最新的 AWS SDK for Node.js,有一个更简洁的解决方案,因为 s3上传()函数接受一个流,使用 wait 语法和 S3的承诺:

var model = await s3Client.upload({
Bucket : bucket,
Key : key,
ContentType : yourContentType,
Body : fs.createReadStream(path-to-file)
}).promise();

如果您使用的是 AWS 节点 SDK v3,那么有一个专用的模块用于上传流/blobs/buffer。

Https://www.npmjs.com/package/@aws-sdk/lib-storage

为其创建一个 new stream.PassThrough()pipe输入流,然后将 passthrough 实例传递给主体。

查看下面的例子:

function upload(s3, inputStream) {
const pass = new PassThrough();


inputStream.pipe(pass);


return s3.upload(
{
Bucket: 'bucket name',
Key: 'unique file name',
Body: pass,
},
{
queueSize: 4, // default concurrency
},
).promise()
.then((data) => console.log(data))
.catch((error) => console.error(error));
}


我认为值得更新 AWS SDK v3的答案:)。

S3客户端不再具有 upload功能,而是按照 https://github.com/aws/aws-sdk-js-v3/blob/main/lib/lib-storage/README.md建议使用 @aws-sdk/lib-storage

因此,生成的代码片段应该如下所示:

import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const stream = require('stream');


...


const client = new S3Client({
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
},
region: process.env.AWS_DEFAULT_REGION,
});


...


async function uploadStream(readableStream) {


const Key = 'filename.pdf';
const Bucket = 'bucket-name';
const passThroughStream = new stream.PassThrough();


let res;


try {
const parallelUploads3 = new Upload({
client,
params: {
Bucket,
Key,
Body: passThroughStream,
ACL:'public-read',
},
queueSize: 4,
partSize: 1024 * 1024 * 5,
leavePartsOnError: false,
});


readableStream.pipe(passThroughStream);
res = await parallelUploads3.done();
} catch (e) {
console.log(e);
}


return res;
}