为什么 Kafka Consumer 不断收到相同的消息(偏移量) [英] Why does Kafka Consumer keep receiving the same messages (offset)

查看:41
本文介绍了为什么 Kafka Consumer 不断收到相同的消息(偏移量)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 SOAP Web 服务,它发送 kafka 请求消息并等待 kafka 响应消息(例如,consumer.poll(10000)).

I have a SOAP Web Service that sends a kafka request message and waits for a kafka response message (e.g. consumer.poll(10000)).

每次调用 Web 服务时,它都会创建一个新的 Kafka Producer 和一个新的 Kafka Consumer.

Each time the web service is called it creates a new Kafka Producer and a new Kafka Consumer.

每次我调用 Web 服务时,消费者都会收到相同的消息(例如,具有相同偏移量的消息).

every time I call the web service the consumer receives the same messages (e.g. messages with the same offset).

我使用的是 Kafka 0.9 并启用了自动提交,自动提交频率为 100 毫秒.

I am using Kafka 0.9 and have auto commit enabled and a auto commit frequency of 100 ms.

对于由 poll() 方法返回的每个 ConsumerRecord,我在其自己的 Callable 中处理,例如

for each ConsumerRecord returned by the poll() method I process within its own Callable, e.g.

ConsumerRecords<String, String> records = consumer.poll(200);

for (ConsumerRecord<String, String> record : records) {

final Handler handler = new Handler(consumerRecord);
            executor.submit(handler);

}

为什么我总是一遍又一遍地收到相同的消息?

Why do I keep receiving the same messages over and over again?

更新 0001

metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class com.kafka.MDCDeserializer
group.id = group-A.group
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [machine1:6667, machine2:6667, machine3:6667, machine0:6667]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id = 
ssl.endpoint.identification.algorithm = null
key.deserializer = class com.kafka.UUIDDerializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXTSASL
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = IbmX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest

推荐答案

基于您显示的代码.我认为你的问题是新的消费者是单线程的.如果您轮询一次然后不进行另一次轮询,则 auto.commit.offset 将不起作用.

Based on the code that you are showing. I think your problem is that the new Consumer is single threaded. If you poll once and then don't do another poll then auto.commit.offset is not going to work.

尝试将代码放入 while 循环中,看看何时再次轮询偏移量将被提交.

Try putting your code in a while loop and see when you poll again the offset will be committed.

这篇关于为什么 Kafka Consumer 不断收到相同的消息(偏移量)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆