为什么Kafka Consumer不断收到相同的消息(偏移) [英] Why does Kafka Consumer keep receiving the same messages (offset)

查看:343
本文介绍了为什么Kafka Consumer不断收到相同的消息(偏移)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个SOAP Web服务,该服务发送kafka请求消息并等待kafka响应消息(例如,consumer.poll(10000)).

I have a SOAP Web Service that sends a kafka request message and waits for a kafka response message (e.g. consumer.poll(10000)).

每次调用Web服务时,它都会创建一个新的Kafka Producer和一个新的Kafka Consumer.

Each time the web service is called it creates a new Kafka Producer and a new Kafka Consumer.

每次我调用Web服务时,消费者都会收到相同的消息(例如,具有相同偏移量的消息).

every time I call the web service the consumer receives the same messages (e.g. messages with the same offset).

我正在使用Kafka 0.9,并且启用了自动提交功能,并且自动提交频率为100毫秒.

I am using Kafka 0.9 and have auto commit enabled and a auto commit frequency of 100 ms.

,我在其自己的Callable中进行处理,例如

for each ConsumerRecord returned by the poll() method I process within its own Callable, e.g.

ConsumerRecords<String, String> records = consumer.poll(200);

for (ConsumerRecord<String, String> record : records) {

final Handler handler = new Handler(consumerRecord);
            executor.submit(handler);

}

为什么我一遍又一遍地收到相同的消息?

Why do I keep receiving the same messages over and over again?

更新0001

metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class com.kafka.MDCDeserializer
group.id = group-A.group
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [machine1:6667, machine2:6667, machine3:6667, machine0:6667]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id = 
ssl.endpoint.identification.algorithm = null
key.deserializer = class com.kafka.UUIDDerializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXTSASL
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = IbmX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest

推荐答案

基于所显示的代码.我认为您的问题是新的Consumer是单线程的.如果您轮询一次,然后又不进行其他轮询,则auto.commit.offset将无法正常工作.尝试将代码放入while循环中,看看在再次调用poll时是否提交了偏移量.

Base on the code that you are showing. I think that your problem is that the new Consumer is single threaded. If you poll once and then don't do another poll the auto.commit.offset is not going to work. Try putting your code in a while loop and see if when you call poll again the offset is committed.

这篇关于为什么Kafka Consumer不断收到相同的消息(偏移)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆