如何在异步函数中创建AWS S3对象的读取流? [英] How to create a read stream of a AWS S3 object in a async function?

查看:63
本文介绍了如何在异步函数中创建AWS S3对象的读取流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在异步函数中创建AWS S3对象的读取流?

How to create a read stream of a AWS S3 object in a async function?

如果我尝试

exports.handler = async (event) => {
  var csvreadstream = await s3.getObject({ Bucket: bucket, Key: filename }).promise().createReadStream()
}

exports.handler = async (event) => {
  var s3Object = await s3.getObject({ Bucket: bucket, Key: filename }).promise();
  var csvreadstream = s3Object.createReadStream();
}

我知道

{
  "errorType": "TypeError",
  "errorMessage": "(intermediate value).createReadStream is not a function",
  "trace": [
    "TypeError: (intermediate value).createReadStream is not a function",
    "    at Runtime.exports.handler (/var/task/app.js:29:86)",
    "    at processTicksAndRejections (internal/process/task_queues.js:94:5)"
  ]
}

任何人都可以建议如何以异步(异步/等待方式)功能从S3对象创建读取流吗?谢谢!

Can anyone advice how to create a read stream from an S3 object in an async (async/await manner) function? Thank you!

感谢Mark B,我走了一步:

Thanks to Mark B I am a step further:

const AWS = require('aws-sdk');
const utils = require('./utils');
const csv = require('fast-csv');
const stream = require('stream');
const s3 = new AWS.S3();

exports.handler = async (event) => {
    console.log("Incoming Event: ", JSON.stringify(event));
    const bucket = event.Records[0].s3.bucket.name;
    const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
    const message = `File is uploaded in - ${bucket} -> ${filename}`;
    console.log(message);
    
    var errors = [];

    const splittedFilename = filename.split('.');
    const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
    const reportBucket = 'external.transactions.reports';
    
    var s3object = (await s3.getObject({ Bucket: bucket, Key: filename }).promise());
    var csvreadstream = new stream.Readable();
    csvreadstream._read = () => {};
    csvreadstream.push(s3object.Body);
   
    csvreadstream
    .pipe(csv.parse({ headers: true }))
    .on('data', async function(data){
        this.pause();
        console.log("DATA: " + data);
        await utils.filterLogic(data, errors);
        this.resume();
    })
    .on('end', async function(){
        console.log("END");
        await utils.writeErrorReport(errors, s3, reportBucket, reportFilename);
    })
};

但是,流似乎没有得到处理,就像调用 .on()是否有人建议如何在异步功能中处理读取流?非常感谢您的帮助.

However, the stream seems not to get processed, like calling .on() Does anyone have an advice how to process the read stream in an async function? Thanks a lot for your healp.

推荐答案

在您的代码中, s3Object.Body 将返回一个Buffer.如果您需要转换缓冲区,则可以在此处中查看类似的技术.到流.

In your code s3Object.Body would return a Buffer. You could then look at a technique like the answers here if you need to convert a Buffer to a Stream.

这篇关于如何在异步函数中创建AWS S3对象的读取流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆