AWS lambda 到 firehose 的转换:Python [英] AWS lambda transformation to firehose: Python

查看:39
本文介绍了AWS lambda 到 firehose 的转换:Python的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用 lambda 函数转换 AWS kinesis 流数据,然后使用 AWS firehose 传输到 S3.但是,我总是遇到这个问题:errorCode":"Lambda.FunctionError","errorMessage":"Lambda 函数已成功调用,但返回错误结果."

这是 lambda_function.

<预><代码>导入 base64导入jsondef lambda_handler(事件,上下文):输出 = []用于记录事件['Records']:# 你自己的业务逻辑.json_object = {名称":这是一个测试"}输出记录 = {'recordId': record['eventID'], # 这是问题吗?我用的是sequenceNumber,不对.'结果': '好的','数据':base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')}output.append(output_record)print('已成功处理 {} 条记录.'.format(len(event['Records'])))返回{'记录':输出}

此处发布了一个相关问题.Kinesis Firehose lambda 转换.但似乎kinesis数据格式与我得到的不同.注意到我得到的事件如下所示,它是资本记录,而不是记录.并且没有recordId,而是eventID.

<代码>{'记录': [{'运动':{'kinesisSchemaVersion': '1.0','分区键':'1','序列号':'49603262076998903856573762341186472148109820820203765762','数据':'eyJwcm9wIjogIjc5IiwgInRpbWVzdGFtcCI6ICIxNTk2MzE0MjM0IiwgInRoaW5nX2lkIjogImFhLWJiIn0=','approximateArrivalTimestamp':1596314234.567},'eventSource': 'aws:kinesis','事件版本':'1.0','eventID':'shardId-000000000000:49603262076998903856573762341186472148109820820203765762','eventName': 'aws:kinesis:record','invokeIdentityArn':'xxx','awsRegion': 'us-east-1','事件源ARN':'xxx'}]}

解决方案

这取决于您如何配置 Kinesis、Firehose 和 Lambda 管道.

如果您的 Kinesis 流触发 Lambda 将数据传送到 Firehose,那么您将对 Kinesis Record Event 感兴趣.查看将 AWS Lambda 与 Amazon Kinesis 结合使用.下面的示例事件

<代码>{记录":[{运动":{kinesisSchemaVersion":1.0",partitionKey":1",序列号":49590338271490256608559692538361571095921575989136588898",数据":SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",approximateArrivalTimestamp":1545084650.987},事件源":aws:kinesis",事件版本":1.0",事件ID":shardId-000000000006:49590338271490256608559692538361571095921575989136588898",事件名称":aws:kinesis:record",invokeIdentityArn":arn:aws:iam::123456789012:role/lambda-role",awsRegion":us-east-2",eventSourceARN":arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"},{运动":{kinesisSchemaVersion":1.0",partitionKey":1",序列号":49590338271490256608559692540925702759324208523137515618",数据":VGhpcyBpcyBvbmx5IGEgdGVzdC4=",approximateArrivalTimestamp":1545084711.166},事件源":aws:kinesis",事件版本":1.0",事件ID":shardId-000000000006:49590338271490256608559692540925702759324208523137515618",事件名称":aws:kinesis:record",invokeIdentityArn":arn:aws:iam::123456789012:role/lambda-role",awsRegion":us-east-2",eventSourceARN":arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"}]}

另一种设置可能是 Firehose 轮询 Kinesis 流.此外,我们还可以灵活地为 Firehose 设置转换 Lambda (Amazon Kinesis Data Firehose 数据转换).在此设置示例事件将如下(使用 AWS LambdaAmazon Kinesis Data Firehose)

<代码>{invocationId":invoked123",deliveryStreamArn":aws:lambda:events",地区":us-west-2",记录":[{数据":SGVsbG8gV29ybGQ=",记录ID":记录1",approximateArrivalTimestamp":1510772160000,kinesisRecordMetadata":{shardId":shardId-000000000000",partitionKey":4d1ad2b9-24f8-4b9d-a088-76e9947c317a",approximateArrivalTimestamp":2012-04-23T18:25:43.511Z",序列号":49546986683135544286507457936321625675700192471156785154",子序列号":"}},{数据":SGVsbG8gV29ybGQ=",recordId":record2",approximateArrivalTimestamp":151077216000,kinesisRecordMetadata":{shardId":shardId-000000000001",partitionKey":4d1ad2b9-24f8-4b9d-a088-76e9947c318a",approximateArrivalTimestamp":2012-04-23T19:25:43.511Z",序列号":49546986683135544286507457936321625675700192471156785155",子序列号":"}}]}

  1. Kinesis Firehose lambda 转换 问题似乎与第二种设置有关.
  2. 您的数据管道似乎正在使用第一种类型的设置.

I want to transform the AWS kinesis stream data using lambda function and then deliver to S3 using AWS firehose. However, I always encountered this problem: errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result."

This is the lambda_function.


import base64
import json

def lambda_handler(event, context):
    output = []
    for record in event['Records']:
        # your own business logic.
        json_object = {"name": "this is a test"}
        output_record = {
            'recordId': record['eventID'], # is this the problem? I used sequenceNumber, it is not right. 
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['Records'])))
    return {'records': output}

A related question was posted here. Kinesis Firehose lambda transformation. But it seems the kinesis data format is different from what I got. Noticed that the events I got are like the following, it is Capital Records, not records. And there is no recordId, but it is eventID.

{
    'Records': [
        {
            'kinesis': {
                'kinesisSchemaVersion': '1.0', 
                'partitionKey': '1', 
                'sequenceNumber': '49603262076998903856573762341186472148109820820203765762', 
                'data':'eyJwcm9wIjogIjc5IiwgInRpbWVzdGFtcCI6ICIxNTk2MzE0MjM0IiwgInRoaW5nX2lkIjogImFhLWJiIn0=', 
                'approximateArrivalTimestamp': 1596314234.567
            }, 
            'eventSource': 'aws:kinesis', 
            'eventVersion': '1.0', 
            'eventID': 'shardId-000000000000:49603262076998903856573762341186472148109820820203765762', 
            'eventName': 'aws:kinesis:record', 
            'invokeIdentityArn':'xxx', 
            'awsRegion': 'us-east-1', 
            'eventSourceARN': 'xxx'
        }
    ]
}

解决方案

It depends upon how you've configured your Kinesis, Firehose and Lambda pipeline.

If your Kinesis stream triggers a Lambda to delivers the data to Firehose, then you'll be interested in Kinesis Record Event. Checkout Using AWS Lambda with Amazon Kinesis. Sample event below

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}

Another setup could be Firehose polling the Kinesis stream. Also, we get the flexibility to setup a transformation Lambda for Firehose (Amazon Kinesis Data Firehose Data Transformation). In this setup sample event will be as follows (Using AWS Lambda with Amazon Kinesis Data Firehose)

{
  "invocationId": "invoked123",
  "deliveryStreamArn": "aws:lambda:events",
  "region": "us-west-2",
  "records": [
    {
      "data": "SGVsbG8gV29ybGQ=",
      "recordId": "record1",
      "approximateArrivalTimestamp": 1510772160000,
      "kinesisRecordMetadata": {
        "shardId": "shardId-000000000000",
        "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
        "approximateArrivalTimestamp": "2012-04-23T18:25:43.511Z",
        "sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
        "subsequenceNumber": ""
      }
    },
    {
      "data": "SGVsbG8gV29ybGQ=",
      "recordId": "record2",
      "approximateArrivalTimestamp": 151077216000,
      "kinesisRecordMetadata": {
        "shardId": "shardId-000000000001",
        "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
        "approximateArrivalTimestamp": "2012-04-23T19:25:43.511Z",
        "sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
        "subsequenceNumber": ""
      }
    }
  ]
}

  1. The Kinesis Firehose lambda transformation question seems to be concerned with the second type of setup.
  2. Your data pipeline seems to be using the first type of setup.

这篇关于AWS lambda 到 firehose 的转换:Python的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆