DynamoDB没有收到整个SQS消息正文 [英] DynamoDB not receiving the entire SQS message body

查看:99
本文介绍了DynamoDB没有收到整个SQS消息正文的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在从API分批提取数据,并将其发送到SQS队列.我遇到问题的地方是处理消息,以便将数据发送到DynamoDB.数据集中应该有147,689条记录.但是,在运行代码时,有时将少于147,689条记录放入DynamoDB,有时将多于147,689条记录放入DynamoDB,有时将147,689条记录放入DynamoDB.它并没有一致地将147,689条记录放入数据库中.

I am pulling data from an API in batches and sending it to an SQS Queue. Where I am having an issue is processing the message in order to send the data to DynamoDB. There is supposed to be 147,689 records in the dataset. However, when running the code, sometimes less than 147,689 records will be put to DynamoDB, sometimes more than 147,689 records will be put to DynamoDB, and sometimes 147,689 records will be put to DynamoDB. It is not consistently putting 147,689 records into the database.

我已经尝试了所有可以解决的问题,包括(使用Fifo队列而不是标准队列,增加可见性超时,增加传递超时,使用uuid.uuid1()代替uuid.uuid4())我正在遍历记录"列出,因此不确定为什么不处理整个批次.下面是我处理消息并将数据发送到DynamoDB的最新代码:

I have tried everything I can think of to try and fix this issue including (utilizing a Fifo queue instead of a standard queue, increasing the visibility timeout, increasing the delivery timeout, using uuid.uuid1() instead of uuid.uuid4()) I am looping through the "Record" list so not sure why it is not processing the entire batch. Below is my latest code to process the message and send the data to DynamoDB:

import boto3
import json
import uuid
import time

dynamo = boto3.client("dynamodb", "us-east-1")

def lambda_handler(event, context):
    for item in json.loads(event["Records"][0]["body"]):
        item["id"] = uuid.uuid1().bytes
        for key, value in item.items():
            if key == "id":
                item[key] = {"B": bytes(value)}
            elif key == "year":
                item[key] = {"N": str(value)}
            elif key == "amt_harvested":
                item[key] = {"N": str(value)}
            elif key == "consumed":
                item[key] = {"N": str(value)}
            else:
                item[key] = {"S": str(value)}

     
            time.sleep(0.001)
        
        dynamo.put_item(TableName="TableOne", Item=dict(item))

推荐答案

数组.

Lambda Event Source Mapping for SQS will poll for messages and call Lambda function for a batch of records based on batch size which by default is 10. And processing the batch should be done by looping event["Records"] array.

设置批次大小时应考虑的关键因素.

Key factors that should be considered for setting batch size.

  • 如果lambda处理失败,则将重新发送整个批次,并将由AWS重试.如果函数不接受处理重复记录,则应该将batchsize设置为1.
  • 如果在lambda中处理单个记录需要20毫秒,则AWS仍将向我们收取100毫秒(这是最低费用),只需将批大小设置为5,我们就可以轻松地将成本降低5倍.

始终建议

  • 设置较大的批处理大小,并将lambda代码设为幂等.
  • 对Lambda进行编码以处理所有记录,无论批次大小是多少.

这篇关于DynamoDB没有收到整个SQS消息正文的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆