Spring Aws Kinesis Binder ProvisionedThroughputExceededException,同时在批处理模式下使用消息 [英] Spring Aws Kinesis Binder ProvisionedThroughputExceededException while consuming messages in Batch Mode
问题描述
我正在使用批处理模式从 kinesis 流中提取记录.我们正在使用 spring aws kinesis binder.
I am using the batch mode to pull in the records from kinesis stream. We are using spring aws kinesis binder.
大多数时候我们无法从流中提取消息.只有有时我们能够从流中提取消息.
Most of the times we are not able to pull messages from stream. Only some times we are able to pull messages from stream.
我的配置如下
我的配置
spring:
cloud:
stream:
kinesis:
binder:
locks:
leaseDuration: 30
readCapacity: 1
writeCapacity: 1
checkpoint:
readCapacity: 1
writeCapacity: 1
bindings:
InStreamGroupOne:
consumer:
listenerMode: batch
idleBetweenPolls: 30000
recordsLimit: 5000
consumer-backoff: 1000
bindings:
InStreamGroupOne:
group: in-stream-group
destination: stream-1
content-type: application/json
OutboundStreamOne:
destination: stream-2
content-type: application/json
OutboundStreamTwo:
destination: stream-3
content-type: application/json
OutboundStreamThree:
destination: stream-4
content-type: application/json
当我启用调试日志时,我可以看到这个异常
When I enable the debug logging, I could able to see this exception
Received error response: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException;
我尝试将批量大小减少为 150,将 idleBetweenPools 减少到 1 秒.我还将 readCapacity 和 writeCapacity 更新为 10.但同样的错误.
I tried reducing the batch size as 150 and idleBetweenPools to 1sec. I also updated readCapacity and writeCapacity to 10. But same error.
从 AWS 控制台,我可以看到 SpringIntegrationLockRegistry 已超过读取阈值.
From AWS console, I could see that SpringIntegrationLockRegistry has crossed read threshold.
能否请您帮助我们了解问题所在.
Can you please help us understand whats wrong.
它有时有效,有时不起作用.
It works some times and does not work some time.
推荐答案
对于 AWS 上的 DynamoDB,您可以执行以下操作:如何解决dynamodb的吞吐量错误?
Here is what you can do in regards to DynamoDB on AWS: How to solve throughput error for dynamodb?
从应用程序的角度来看,您可以使用锁定选项:https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#lockregistry
From the application perspective, you can play with options for the locks: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#lockregistry
leaseDuration
将授予锁租用的时间长度.例如,如果将其设置为 30 秒,那么如果在至少 30 秒内未发送心跳,则锁定将过期(例如,如果框或心跳线程死亡,就会发生这种情况.)
The length of time that the lease for the lock will be granted for. If this is set to, for example, 30 seconds, then the lock will expire if the heartbeat is not sent for at least 30 seconds (which would happen if the box or the heartbeat thread dies, for example.)
默认值:20
heartbeatPeriod
多久更新一次 DynamoDB 以注意实例仍在运行(建议将此至少设置为leagueDuration 的 3 倍 - 例如 heartBeatPeriod=1 秒,leaseDuration=10 秒可能是合理的配置,请确保包括用于网络延迟的缓冲区.)
How often to update DynamoDB to note that the instance is still running (recommendation is to make this at least 3 times smaller than the leaseDuration - for example heartBeatPeriod=1 second, leaseDuration=10 seconds could be a reasonable configuration, make sure to include a buffer for network latency.)
默认值:5
refreshPeriod
在尝试再次获取锁之前等待多长时间(例如,如果设置为 10 秒,它将每 10 秒尝试一次)
How long to wait before trying to get the lock again (if set to 10 seconds, for example, it would attempt to do so every 10 seconds)
默认值:1000
这篇关于Spring Aws Kinesis Binder ProvisionedThroughputExceededException,同时在批处理模式下使用消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!