Kafka消息被重新处理 [英] Kafka messages are reprocessed
问题描述
我们有一个微服务,它使用spring-boot和spring-cloud-stream产生和使用来自Kafka的消息.
版本:
弹簧靴:1.5.8.RELEASE
spring-cloud-stream:Ditmars.RELEASE
Kafka服务器:kafka_2.11-1.0.0
We have a micro-services that produces and consumes messages from Kafka using spring-boot and spring-cloud-stream.
versions:
spring-boot: 1.5.8.RELEASE
spring-cloud-stream: Ditmars.RELEASE
Kafka server: kafka_2.11-1.0.0
我们正在使用3个Kafka节点的StatefulSets集群和3个Zookeeper节点的集群在Kubernetes环境中工作.
We are working in a Kubernetes environment using StatefulSets cluster of 3 Kafka nodes and a cluster of 3 Zookeeper nodes.
几天前已经处理过一些旧邮件,这些邮件会被重新处理.
一些注意事项:
We experienced several occurrences of old messages that are reprocessed when those messages where already processed few days ago.
Several notes:
- 在此之前,打印了以下日志(还有更多类似的行,这只是摘要)
撤销先前分配的分区[]以进行组注册服务
发现协调员dev-kafka-1.kube1.iaas.watercorp.com:9092(编号:2147483646机架:空)
成功地加入了第320代的组注册服务
Revoking previously assigned partitions [] for group enrollment-service
Discovered coordinator dev-kafka-1.kube1.iaas.watercorp.com:9092 (id: 2147483646 rack: null)
Successfully joined group enrollment-service with generation 320
- 上述撤销和重新分配分区的事件每隔几个小时发生一次.而且在这些事件中,很少有旧消息被重新使用.在大多数情况下,重新分配不会触发消息使用.
- 消息来自不同的分区.
- 正在重新处理的每个分区消息多于1条.
- The above-mentioned incidents of revoking and reassigning of partitions happens every few hours. And just in few of those incidents old messages are re-consumed. In most cases the reassigning doesn't triggers message consumption.
- The messages are from different partitions.
- There are more than 1 message per partition that is being reprocessed.
application.yml:
application.yml:
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka
defaultBrokerPort: 9092
zkNodes: zookeeper
defaultZkPort: 2181
minPartitionCount: 2
replicationFactor: 1
autoCreateTopics: true
autoAddPartitions: true
headers: type,message_id
requiredAcks: 1
configuration:
"[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
bindings:
user-enrollment-input:
consumer:
autoRebalanceEnabled: true
autoCommitOnError: true
enableDlq: true
user-input:
consumer:
autoRebalanceEnabled: true
autoCommitOnError: true
enableDlq: true
enrollment-mail-output:
producer:
sync: true
configuration:
retries: 10000
enroll-users-output:
producer:
sync: true
configuration:
retries: 10000
default:
binder: kafka
contentType: application/json
group: enrollment-service
consumer:
maxAttempts: 1
producer:
partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor
bindings:
user-enrollment-input:
destination: enroll-users
consumer:
concurrency: 10
partitioned: true
user-input:
destination: user
consumer:
concurrency: 5
partitioned: true
enrollment-mail-output:
destination: send-enrollment-mail
producer:
partitionCount: 10
enroll-users-output:
destination: enroll-users
producer:
partitionCount: 10
是否存在我可能缺少的任何配置?什么会导致这种行为?
Is there any configuration that I might be missing? What can cause this behavior?
推荐答案
So the actual problem is the one that is described in the following ticket: https://issues.apache.org/jira/browse/KAFKA-3806. Using the suggested workaround fixed it.
这篇关于Kafka消息被重新处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!