如何在AWS Node.js Lambda中使用stream.pipeline [英] How to use stream.pipeline in aws nodejs lambda

查看:110
本文介绍了如何在AWS Node.js Lambda中使用stream.pipeline的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用nodejs lambda将数据从mongodb游标流式传输到s3文件中.

I am trying to stream the data from a mongodb cursor into an s3 file using a nodejs lambda.

以下是我的代码的一个片段.

Following is a snippet of my code.

我观察到的是lambda不会等待管道完成并存在它,因此该文件不会写入s3.

What i observe is that the lambda does not wait for the pipeline to complete and exists it, so the file is not written to s3.

但是如果我将其作为独立的node.js脚本运行,同样可以正常工作.

But the same works fine if I run it as a standalone node.js script.

const logger = require('./logger').logger;
let s3Client = require('aws-sdk/clients/s3');
const stream = require('stream');
const util = require('util');
const pipeline = util.promisify(stream.pipeline);

exports.handler =  async (event, context) => {


    await pipeline(
        client.db("somedb").collection("somecollection").aggregate(crtiriaObj).stream({transform: x => `${JSON.stringify(x)}\n`}),
        uploadFromStream()
    )

};

let uploadFromStream =  () => {

    let pass = new stream.PassThrough();
    let s3 = new s3Client();;


    let params = {Bucket: "bucketname", Key: "filename", Body: pass};

    s3.upload(params, function(err, data) {
        if (err) {
            logger.error(`Error uploading file ${fileName}`,err);
        } else {
            logger.info(`Successfully uploaded file: ${fileName}, result: ${JSON.stringify(data)}`);
        }

    });

    return pass;
};

推荐答案

我最终没有做异步/等待方式.

I ended up doing it without async / await fashion.

我的编码最终看起来像下面的代码片段. 我还在以下位置写了一篇博客文章: https: //dev.to/anandsunderraman/copy-over-data-from-mongodb-to-s3-3j4g

My coded ended up looking like the snippet below. I have also written a blogpost about it at: https://dev.to/anandsunderraman/copying-over-data-from-mongodb-to-s3-3j4g

const MongoClient = require('mongodb').MongoClient;
let s3Client = require('aws-sdk/clients/s3');
const stream = require('stream');
const pipeline = stream.pipeline;


//brute force method loading all the data into an array
exports.copyData = (event, context, callback) => {

    MongoClient.connect(getDBURI(), {
            useNewUrlParser: true,
            useUnifiedTopology: true
    }).then((dbConnection) => {

        pipeline(
            dbConnection.db("<db-name>").collection("<collection-name>").aggregate(<aggregate-criteria>)
                                        .stream({transform: x => convertToNDJSON(x)}),
            uploadDataToS3(callback),
            (err) => {
                if (err) {
                    console.log('Pipeline failed.', err);
                } else {
                    console.log('Pipeline succeeded.');
                }
            }
        )

    })


}
/**
 * Construct the DB URI based on the environment
 * @returns {string}
 */
const getDBURI = () => {
    //best practice is to fetch the password from AWS Parameter store
    return "mongodb://<username>:<password>@<hostname>/<your-db-name>";
};

//converts each db record to ndjson => newline delimited json
let convertToNDJSON = (data) => {
    return JSON.stringify(data) + "\n";
};

let uploadDataToS3 =  (callback) => {
    let env = process.env;
    let s3 = null;
    let pass = new stream.PassThrough();
    if (env === 'local') {
        s3  = new s3Client({
            accessKeyId: 'minioadmin' ,
            secretAccessKey: 'minioadmin' ,
            endpoint: 'http://host.docker.internal:9000' ,
            s3ForcePathStyle: true, // needed with minio?
            signatureVersion: 'v4'
        });
    } else {
        s3 = new s3Client();
    }
    //using multipart upload to speed up the process
    let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: data};
    let opts = {queueSize: 2, partSize: 1024 * 1024 * 10};

    s3.upload(params,opts, function(err, data) {
        if (err) {
            console.log(`Error uploading file ${file-name}`,err);
        } else {
            console.log(`Successfully uploaded file: ${file-name}, result: ${JSON.stringify(data)}`);
        }
        callback();

    });
    return pass;

};

这篇关于如何在AWS Node.js Lambda中使用stream.pipeline的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆