AWS lambda 到 firehose 的转换:Python [英] AWS lambda transformation to firehose: Python
问题描述
我想使用 lambda 函数转换 AWS kinesis 流数据,然后使用 AWS firehose 传输到 S3.但是,我总是遇到这个问题:errorCode":"Lambda.FunctionError","errorMessage":"Lambda 函数已成功调用,但返回错误结果."
这是 lambda_function.
<预><代码>导入 base64导入jsondef lambda_handler(事件,上下文):输出 = []用于记录事件['Records']:# 你自己的业务逻辑.json_object = {名称":这是一个测试"}输出记录 = {'recordId': record['eventID'], # 这是问题吗?我用的是sequenceNumber,不对.'结果': '好的','数据':base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')}output.append(output_record)print('已成功处理 {} 条记录.'.format(len(event['Records'])))返回{'记录':输出}此处发布了一个相关问题.Kinesis Firehose lambda 转换.但似乎kinesis数据格式与我得到的不同.注意到我得到的事件如下所示,它是资本记录,而不是记录.并且没有recordId,而是eventID.
<代码>{'记录': [{'运动':{'kinesisSchemaVersion': '1.0','分区键':'1','序列号':'49603262076998903856573762341186472148109820820203765762','数据':'eyJwcm9wIjogIjc5IiwgInRpbWVzdGFtcCI6ICIxNTk2MzE0MjM0IiwgInRoaW5nX2lkIjogImFhLWJiIn0=','approximateArrivalTimestamp':1596314234.567},'eventSource': 'aws:kinesis','事件版本':'1.0','eventID':'shardId-000000000000:49603262076998903856573762341186472148109820820203765762','eventName': 'aws:kinesis:record','invokeIdentityArn':'xxx','awsRegion': 'us-east-1','事件源ARN':'xxx'}]}
这取决于您如何配置 Kinesis、Firehose 和 Lambda 管道.
如果您的 Kinesis 流触发 Lambda 将数据传送到 Firehose,那么您将对 Kinesis Record Event 感兴趣.查看将 AWS Lambda 与 Amazon Kinesis 结合使用.下面的示例事件
<代码>{记录":[{运动":{kinesisSchemaVersion":1.0",partitionKey":1",序列号":49590338271490256608559692538361571095921575989136588898",数据":SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",approximateArrivalTimestamp":1545084650.987},事件源":aws:kinesis",事件版本":1.0",事件ID":shardId-000000000006:49590338271490256608559692538361571095921575989136588898",事件名称":aws:kinesis:record",invokeIdentityArn":arn:aws:iam::123456789012:role/lambda-role",awsRegion":us-east-2",eventSourceARN":arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"},{运动":{kinesisSchemaVersion":1.0",partitionKey":1",序列号":49590338271490256608559692540925702759324208523137515618",数据":VGhpcyBpcyBvbmx5IGEgdGVzdC4=",approximateArrivalTimestamp":1545084711.166},事件源":aws:kinesis",事件版本":1.0",事件ID":shardId-000000000006:49590338271490256608559692540925702759324208523137515618",事件名称":aws:kinesis:record",invokeIdentityArn":arn:aws:iam::123456789012:role/lambda-role",awsRegion":us-east-2",eventSourceARN":arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"}]}
另一种设置可能是 Firehose 轮询 Kinesis 流.此外,我们还可以灵活地为 Firehose 设置转换 Lambda (Amazon Kinesis Data Firehose 数据转换).在此设置示例事件将如下(使用 AWS LambdaAmazon Kinesis Data Firehose)
<代码>{invocationId":invoked123",deliveryStreamArn":aws:lambda:events",地区":us-west-2",记录":[{数据":SGVsbG8gV29ybGQ=",记录ID":记录1",approximateArrivalTimestamp":1510772160000,kinesisRecordMetadata":{shardId":shardId-000000000000",partitionKey":4d1ad2b9-24f8-4b9d-a088-76e9947c317a",approximateArrivalTimestamp":2012-04-23T18:25:43.511Z",序列号":49546986683135544286507457936321625675700192471156785154",子序列号":"}},{数据":SGVsbG8gV29ybGQ=",recordId":record2",approximateArrivalTimestamp":151077216000,kinesisRecordMetadata":{shardId":shardId-000000000001",partitionKey":4d1ad2b9-24f8-4b9d-a088-76e9947c318a",approximateArrivalTimestamp":2012-04-23T19:25:43.511Z",序列号":49546986683135544286507457936321625675700192471156785155",子序列号":"}}]}
- Kinesis Firehose lambda 转换 问题似乎与第二种设置有关.
- 您的数据管道似乎正在使用第一种类型的设置.
I want to transform the AWS kinesis stream data using lambda function and then deliver to S3 using AWS firehose. However, I always encountered this problem: errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result."
This is the lambda_function.
import base64
import json
def lambda_handler(event, context):
output = []
for record in event['Records']:
# your own business logic.
json_object = {"name": "this is a test"}
output_record = {
'recordId': record['eventID'], # is this the problem? I used sequenceNumber, it is not right.
'result': 'Ok',
'data': base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['Records'])))
return {'records': output}
A related question was posted here. Kinesis Firehose lambda transformation. But it seems the kinesis data format is different from what I got. Noticed that the events I got are like the following, it is Capital Records, not records. And there is no recordId, but it is eventID.
{
'Records': [
{
'kinesis': {
'kinesisSchemaVersion': '1.0',
'partitionKey': '1',
'sequenceNumber': '49603262076998903856573762341186472148109820820203765762',
'data':'eyJwcm9wIjogIjc5IiwgInRpbWVzdGFtcCI6ICIxNTk2MzE0MjM0IiwgInRoaW5nX2lkIjogImFhLWJiIn0=',
'approximateArrivalTimestamp': 1596314234.567
},
'eventSource': 'aws:kinesis',
'eventVersion': '1.0',
'eventID': 'shardId-000000000000:49603262076998903856573762341186472148109820820203765762',
'eventName': 'aws:kinesis:record',
'invokeIdentityArn':'xxx',
'awsRegion': 'us-east-1',
'eventSourceARN': 'xxx'
}
]
}
It depends upon how you've configured your Kinesis, Firehose and Lambda pipeline.
If your Kinesis stream triggers a Lambda to delivers the data to Firehose, then you'll be interested in Kinesis Record Event. Checkout Using AWS Lambda with Amazon Kinesis. Sample event below
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1545084650.987
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
"data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
"approximateArrivalTimestamp": 1545084711.166
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
]
}
Another setup could be Firehose polling the Kinesis stream. Also, we get the flexibility to setup a transformation Lambda for Firehose (Amazon Kinesis Data Firehose Data Transformation). In this setup sample event will be as follows (Using AWS Lambda with Amazon Kinesis Data Firehose)
{
"invocationId": "invoked123",
"deliveryStreamArn": "aws:lambda:events",
"region": "us-west-2",
"records": [
{
"data": "SGVsbG8gV29ybGQ=",
"recordId": "record1",
"approximateArrivalTimestamp": 1510772160000,
"kinesisRecordMetadata": {
"shardId": "shardId-000000000000",
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
"approximateArrivalTimestamp": "2012-04-23T18:25:43.511Z",
"sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
"subsequenceNumber": ""
}
},
{
"data": "SGVsbG8gV29ybGQ=",
"recordId": "record2",
"approximateArrivalTimestamp": 151077216000,
"kinesisRecordMetadata": {
"shardId": "shardId-000000000001",
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
"approximateArrivalTimestamp": "2012-04-23T19:25:43.511Z",
"sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
"subsequenceNumber": ""
}
}
]
}
- The Kinesis Firehose lambda transformation question seems to be concerned with the second type of setup.
- Your data pipeline seems to be using the first type of setup.
这篇关于AWS lambda 到 firehose 的转换:Python的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!