Kinesis lambda DynamoDB [英] Kinesis lambda DynamoDB

查看:149
本文介绍了Kinesis lambda DynamoDB的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在学习一个用例的AWS服务。浏览完文档后,我想到了一个简单的流程。我想通过使用Streams API和KPL将数据摄取到Kinesis流中。我使用示例putRecord方法将数据提取到流中。我正在将此JSON提取到流中-

  { userid:1234, username: jDoe, firstname: John, lastname: Doe} 

一旦数据被提取,我在putRecordResult中得到以下响应-

 放入结果:{ShardId:shardId-000000000000,SequenceNumber:49563097246355834103398973318638512162631666140828401666} 
Put结果:{ShardId:shardId-000000000000,SequenceNumber:49563097246355834103398973318645765717549353915876638722}
Put结果:{ShardId:shardId-000000000000,SequenceNumber:49563097246355355834103103973318649392495008197803400757250}

现在,我编写一个Lambda函数来获取这些数据并将其推入DynamoDB表中。这是我的Lambda函数-

  console.log(加载函数); 
var AWS = require('aws-sdk');
var tableName = sampleTable;
var doc = require(‘dynamodb-doc’);
var db = new doc.DynamoDB();

exports.handler =(事件,上下文,回调)=> {
//console.log(‘接收到的事件:’,JSON.stringify(event,null,2));
event.Records.forEach((record)=> {
// Kinesis数据是base64编码的,因此在此处解码
const有效负载= new Buffer(record.kinesis.data,'base64' ).toString('ascii');
console.log('解码有效载荷:',有效载荷);
var userid = event.userid;
var username = event.username;
var firstname = event.firstname;
console.log(userid +, +用户名+, + firstname);

var item = {
userid :userid,
username:用户名,
firstname:名字
};

var params = {
TableName:tableName,
项目:item
};
console.log(params);

db.putItem(params,function(err,data){
if( err)console.log(err);
else console.log(data);
});

});
回调(空,`已成功处理$ {event.Records.length}个记录。);
};

我无法在lambda函数执行中看到console.logs。我在流页面中看到已将putRecord放到流中,并且得到了,但是以某种方式,我在Lambdafunction页面或DynamoDB表中什么也看不到。



对于Java代码,我有一个IAM策略,用于将数据摄取到Kinesis中;对于Lambda函数,另一个是lambda-kinesis-execution-role;还有一个策略以便DynamoDB将数据提取到表中。



是否有任何教程显示正确方式的完成方式?我感觉到我在此过程中遗漏了许多要点,例如如何链接所有这些IAM策略并使它们同步,以便在将数据放入流中时由Lambda处理并最终在Dynamo中进行? / p>

任何指针和帮助都深表感谢。

解决方案

如果上面的re代码是您正在使用的代码的直接副本,您引用的是 event.userid ,但是您应该使用 payload.userid 。您已将Kinesis记录解码为有效负载变量。


I am learning the AWS services for a use case. After going through the docs I came came up with the a simple flow. I want to ingest data into the Kinesis streams by using the Streams API and the KPL. I use the example putRecord method to ingest data to the streams. I am ingesting the this JSON to the stream -

{"userid":1234,"username":"jDoe","firstname":"John","lastname":"Doe"}

Once the data is ingested i get the following response in putRecordResult -

Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318638512162631666140828401666}
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318645765717549353915876638722}
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318649392495008197803400757250}

Now I write a Lambda function to get these data and push into a DynamoDB table. Here is my Lambda function -

console.log('Loading function');
var AWS = require('aws-sdk');
var tableName = "sampleTable";
var doc = require('dynamodb-doc');
var db = new doc.DynamoDB();

exports.handler = (event, context, callback) => {
    //console.log('Received event:', JSON.stringify(event, null, 2));
    event.Records.forEach((record) => {
        // Kinesis data is base64 encoded so decode here
        const payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
        console.log('Decoded payload:', payload);
        var userid = event.userid;
        var username = event.username;
        var firstname = event.firstname;
        console.log(userid + "," + username +","+ firstname);

        var item = {
            "userid" : userid,
            "username" : username,
            "firstname" : firstname
        };

        var params = {
            TableName : tableName,
            Item : item
        };
        console.log(params);

        db.putItem(params, function(err, data){
            if(err) console.log(err);
            else console.log(data);
        });

    });
    callback(null, `Successfully processed ${event.Records.length} records.`);
};

Somehow I am not able to see the console.logs in the lambda functions execution. I see in the streams page there have been putRecord to the stream and get as well but somehow i can see nothing in the Lambdafunction page nor in the DynamoDB table.

I have an IAM policy for the Java code for the ingestion of the data into Kinesis, another for the Lambda function that is lambda-kinesis-execution-role and a policy for the DynamoDB to ingest data into the tables.

Is there any tutorial that shows how it is done in the right way? I am getting a feeling that I am missing many points in this process for example how to link all those IAM policies and make them in sync so that when the data is put into the stream it is processed by Lambda and ends up in Dynamo?

Any pointers and help is deeply appreciated.

解决方案

If you're code above is a direct copy of the code you're using, you're referencing event.userid but you should be using payload.userid. You've decoded the Kinesis record into the payload variable.

这篇关于Kinesis lambda DynamoDB的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆