在批处理模式下使用消息时,Spring Aws Kinesis Binder ProvisionedThroughputExceededException [英] Spring Aws Kinesis Binder ProvisionedThroughputExceededException while consuming messages in Batch Mode

本文介绍了在批处理模式下使用消息时,Spring Aws Kinesis Binder ProvisionedThroughputExceededException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用批处理模式从kinesis流中提取记录。



在大多数情况下,我们无法从流中提取消息。



我的配置如下所示



我的配置

  spring:
云:
流:
运动:
活页夹:
锁:
租约时间:30
读容量:1
write容量:1
检查点:$ b​​ $ b read容量:1
writeCapacity:1
绑定:
InStreamGroupOne:
使用者:
侦听器模式:批处理
idleBetweenPolls:30000
记录限制:5000
使用者回退: 1000
绑定:
InStreamGroupOne:
组:流内组
目的地:stream-1
内容类型:application / json
OutboundStreamOne:
目的地:stream-2
内容类型:应用n / json
OutboundStreamTwo:
目的地:stream-3
内容类型:application / json
OutboundStreamThree:
目的地:stream-4
content-类型:application / json

启用调试日志记录后,我可以看到此异常

 收到的错误响应:com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException:超出了为表配置的预配置吞吐量级别。考虑使用UpdateTable API提高配置级别。 (服务:AmazonDynamoDBv2;状态代码:400;错误代码:ProvisionedThroughputExceededException; 

我试图减小批量大小为150,idleBetweenPools为1秒。我还将readCapacity和writeCapacity更新为10,但存在相同的错误。



从AWS控制台中,我可以看到SpringIntegrationLockRegistry已超过读取阈值。 / p>

请帮助我们了解问题所在。



它有时会起作用,而有时会不起作用。

解决方案

关于AWS上的DynamoDB,您可以执行以下操作:如何解决dynamodb的通量错误?



从应用程序角度看,您可以使用锁选项: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder -kinesis-docs / src / main / asciidoc / overview.adoc#lockregistry


leaseDuration



将授予锁租约的时间长度。如果将其设置为例如30秒,则如果至少30秒未发送心跳(例如,如果框或心跳线程死亡,则会发生这种情况),锁定将失效。



默认值:20



heartbeatPeriod



更新DynamoDB的频率以注意到实例仍在运行(建议使该值至少比leaseDuration小3倍-例如heartBeatPeriod = 1秒,leaseDuration = 10秒可能是合理的配置,请确保包括用于网络延迟的缓冲区。)



默认值:5



refreshPeriod



尝试再次获取锁定之前要等待多长时间(例如,如果设置为10秒,它将尝试每10秒执行一次)



默认值:1000



I am using the batch mode to pull in the records from kinesis stream. We are using spring aws kinesis binder.

Most of the times we are not able to pull messages from stream. Only some times we are able to pull messages from stream.

My config looks like below

My config

spring:
  cloud:
    stream:
      kinesis:
        binder:
          locks:
            leaseDuration: 30
            readCapacity: 1
            writeCapacity: 1
          checkpoint:
            readCapacity: 1
            writeCapacity: 1
        bindings:
          InStreamGroupOne:
            consumer:
              listenerMode: batch
              idleBetweenPolls: 30000
              recordsLimit: 5000
              consumer-backoff: 1000
      bindings:
        InStreamGroupOne:
          group: in-stream-group
          destination: stream-1
          content-type: application/json
        OutboundStreamOne:
          destination: stream-2
          content-type: application/json
        OutboundStreamTwo:
          destination: stream-3
          content-type: application/json
        OutboundStreamThree:
          destination: stream-4
          content-type: application/json

When I enable the debug logging, I could able to see this exception

Received error response: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; 

I tried reducing the batch size as 150 and idleBetweenPools to 1sec. I also updated readCapacity and writeCapacity to 10. But same error.

From AWS console, I could see that SpringIntegrationLockRegistry has crossed read threshold.

Can you please help us understand whats wrong.

It works some times and does not work some time.

解决方案

Here is what you can do in regards to DynamoDB on AWS: How to solve throughput error for dynamodb?

From the application perspective, you can play with options for the locks: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#lockregistry

leaseDuration

The length of time that the lease for the lock will be granted for. If this is set to, for example, 30 seconds, then the lock will expire if the heartbeat is not sent for at least 30 seconds (which would happen if the box or the heartbeat thread dies, for example.)

Default: 20

heartbeatPeriod

How often to update DynamoDB to note that the instance is still running (recommendation is to make this at least 3 times smaller than the leaseDuration - for example heartBeatPeriod=1 second, leaseDuration=10 seconds could be a reasonable configuration, make sure to include a buffer for network latency.)

Default: 5

refreshPeriod

How long to wait before trying to get the lock again (if set to 10 seconds, for example, it would attempt to do so every 10 seconds)

Default: 1000

这篇关于在批处理模式下使用消息时,Spring Aws Kinesis Binder ProvisionedThroughputExceededException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆