如何在AWS Node.js Lambda中使用stream.pipeline [英] How to use stream.pipeline in aws nodejs lambda
问题描述
我正在尝试使用nodejs lambda将数据从mongodb游标流式传输到s3文件中.
I am trying to stream the data from a mongodb cursor into an s3 file using a nodejs lambda.
以下是我的代码的一个片段.
Following is a snippet of my code.
我观察到的是lambda不会等待管道完成并存在它,因此该文件不会写入s3.
What i observe is that the lambda does not wait for the pipeline to complete and exists it, so the file is not written to s3.
但是如果我将其作为独立的node.js脚本运行,同样可以正常工作.
But the same works fine if I run it as a standalone node.js script.
const logger = require('./logger').logger;
let s3Client = require('aws-sdk/clients/s3');
const stream = require('stream');
const util = require('util');
const pipeline = util.promisify(stream.pipeline);
exports.handler = async (event, context) => {
await pipeline(
client.db("somedb").collection("somecollection").aggregate(crtiriaObj).stream({transform: x => `${JSON.stringify(x)}\n`}),
uploadFromStream()
)
};
let uploadFromStream = () => {
let pass = new stream.PassThrough();
let s3 = new s3Client();;
let params = {Bucket: "bucketname", Key: "filename", Body: pass};
s3.upload(params, function(err, data) {
if (err) {
logger.error(`Error uploading file ${fileName}`,err);
} else {
logger.info(`Successfully uploaded file: ${fileName}, result: ${JSON.stringify(data)}`);
}
});
return pass;
};
推荐答案
我最终没有做异步/等待方式.
I ended up doing it without async / await fashion.
我的编码最终看起来像下面的代码片段. 我还在以下位置写了一篇博客文章: https: //dev.to/anandsunderraman/copy-over-data-from-mongodb-to-s3-3j4g
My coded ended up looking like the snippet below. I have also written a blogpost about it at: https://dev.to/anandsunderraman/copying-over-data-from-mongodb-to-s3-3j4g
const MongoClient = require('mongodb').MongoClient;
let s3Client = require('aws-sdk/clients/s3');
const stream = require('stream');
const pipeline = stream.pipeline;
//brute force method loading all the data into an array
exports.copyData = (event, context, callback) => {
MongoClient.connect(getDBURI(), {
useNewUrlParser: true,
useUnifiedTopology: true
}).then((dbConnection) => {
pipeline(
dbConnection.db("<db-name>").collection("<collection-name>").aggregate(<aggregate-criteria>)
.stream({transform: x => convertToNDJSON(x)}),
uploadDataToS3(callback),
(err) => {
if (err) {
console.log('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
}
)
})
}
/**
* Construct the DB URI based on the environment
* @returns {string}
*/
const getDBURI = () => {
//best practice is to fetch the password from AWS Parameter store
return "mongodb://<username>:<password>@<hostname>/<your-db-name>";
};
//converts each db record to ndjson => newline delimited json
let convertToNDJSON = (data) => {
return JSON.stringify(data) + "\n";
};
let uploadDataToS3 = (callback) => {
let env = process.env;
let s3 = null;
let pass = new stream.PassThrough();
if (env === 'local') {
s3 = new s3Client({
accessKeyId: 'minioadmin' ,
secretAccessKey: 'minioadmin' ,
endpoint: 'http://host.docker.internal:9000' ,
s3ForcePathStyle: true, // needed with minio?
signatureVersion: 'v4'
});
} else {
s3 = new s3Client();
}
//using multipart upload to speed up the process
let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: data};
let opts = {queueSize: 2, partSize: 1024 * 1024 * 10};
s3.upload(params,opts, function(err, data) {
if (err) {
console.log(`Error uploading file ${file-name}`,err);
} else {
console.log(`Successfully uploaded file: ${file-name}, result: ${JSON.stringify(data)}`);
}
callback();
});
return pass;
};
这篇关于如何在AWS Node.js Lambda中使用stream.pipeline的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!