Kafka 消息被重新处理 [英] Kafka messages are reprocessed

查看:51
本文介绍了Kafka 消息被重新处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个微服务,它使用 spring-boot 和 spring-cloud-stream 从 Kafka 生成和使用消息.
版本:
弹簧引导:1.5.8.RELEASE
spring-cloud-stream:Ditmars.RELEASE
卡夫卡服务器:kafka_2.11-1.0.0

我们正在使用 3 个 Kafka 节点的 StatefulSets 集群和 3 个 Zookeeper 节点的集群在 Kubernetes 环境中工作.

我们遇到过几次旧消息在几天前已经处理过的情况下被重新处理的情况.
几个注意事项:

  1. 在此之前,打印了以下日志(有更多类似的行,这只是一个摘要)

<块引用>

撤销先前为组注册服务分配的分区[]
发现协调器 dev-kafka-1.kube1.iaas.watercorp.com:9092 (id: 2147483646 rack: null)
320代成功加入群注册服务

  1. 上述撤销和重新分配分区的事件每隔几个小时就会发生一次.并且仅在少数事件中旧消息被重新使用.在大多数情况下,重新分配不会触发消息消耗.
  2. 消息来自不同的分区.
  3. 每个分区有超过 1 条消息正在重新处理.

应用程序.yml:

<前>春天:云:溪流:卡夫卡:粘合剂:经纪人:kafka默认代理端口:9092zkNodes:动物园管理员默认ZkPort:2181最小分区数:2复制因子:1自动创建主题:true自动添加分区:真标头:类型,message_idrequiredAcks:1配置:[security.protocol]":PLAINTEXT #TODO:这是一种解决方法.应该是 security.protocol绑定:用户注册输入:消费者:启用自动重新平衡:真autoCommitOnError: 真enableDlq: 真用户输入:消费者:启用自动重新平衡:真autoCommitOnError: 真enableDlq: 真注册邮件输出:制作人:同步:真配置:重试:10000注册用户输出:制作人:同步:真配置:重试:10000默认:粘合剂:卡夫卡内容类型:应用程序/json组:注册服务消费者:最大尝试:1制作人:partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor绑定:用户注册输入:目的地:注册用户消费者:并发:10分区:真用户输入:目的地:用户消费者:并发:5分区:真注册邮件输出:目的地:发送注册邮件制作人:分区数:10注册用户输出:目的地:注册用户制作人:分区数:10

是否有我可能遗漏的配置?什么会导致这种行为?

解决方案

所以实际问题是在下面的票中描述的问题:https://issues.apache.org/jira/browse/KAFKA-3806.使用建议的解决方法修复了它.

We have a micro-services that produces and consumes messages from Kafka using spring-boot and spring-cloud-stream.
versions:
spring-boot: 1.5.8.RELEASE
spring-cloud-stream: Ditmars.RELEASE
Kafka server: kafka_2.11-1.0.0

EDIT: We are working in a Kubernetes environment using StatefulSets cluster of 3 Kafka nodes and a cluster of 3 Zookeeper nodes.

We experienced several occurrences of old messages that are reprocessed when those messages where already processed few days ago.
Several notes:

  1. Before that happens the following logs were printed (there are more similar lines this is just a summary)

Revoking previously assigned partitions [] for group enrollment-service
Discovered coordinator dev-kafka-1.kube1.iaas.watercorp.com:9092 (id: 2147483646 rack: null)
Successfully joined group enrollment-service with generation 320

  1. The above-mentioned incidents of revoking and reassigning of partitions happens every few hours. And just in few of those incidents old messages are re-consumed. In most cases the reassigning doesn't triggers message consumption.
  2. The messages are from different partitions.
  3. There are more than 1 message per partition that is being reprocessed.

application.yml:

spring:
  cloud:
      stream:
        kafka:
          binder:
            brokers: kafka
            defaultBrokerPort: 9092
            zkNodes: zookeeper
            defaultZkPort: 2181
            minPartitionCount: 2
            replicationFactor: 1
            autoCreateTopics: true
            autoAddPartitions: true
            headers: type,message_id
            requiredAcks: 1
            configuration:
              "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
          bindings:
            user-enrollment-input:
              consumer:
                autoRebalanceEnabled: true
                autoCommitOnError: true
                enableDlq: true
            user-input:
              consumer:
                autoRebalanceEnabled: true
                autoCommitOnError: true
                enableDlq: true
            enrollment-mail-output:
              producer:
                sync: true
                configuration:
                  retries: 10000
            enroll-users-output:
              producer:
                sync: true
                configuration:
                  retries: 10000
        default:
          binder: kafka
          contentType: application/json
          group: enrollment-service
          consumer:
            maxAttempts: 1
          producer:
            partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor
        bindings:
          user-enrollment-input:
            destination: enroll-users
            consumer:
              concurrency: 10
              partitioned: true
          user-input:
            destination: user
            consumer:
              concurrency: 5
              partitioned: true
          enrollment-mail-output:
            destination: send-enrollment-mail
            producer:
              partitionCount: 10
          enroll-users-output:
            destination: enroll-users
            producer:
              partitionCount: 10

Is there any configuration that I might be missing? What can cause this behavior?

解决方案

So the actual problem is the one that is described in the following ticket: https://issues.apache.org/jira/browse/KAFKA-3806. Using the suggested workaround fixed it.

这篇关于Kafka 消息被重新处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆