KafkaConsumer poll()行为理解 [英] KafkaConsumer poll() behavior understanding

查看:1981
本文介绍了KafkaConsumer poll()行为理解的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

试图了解(对于kafka来说是新的)kafka中的轮询事件循环是如何工作的.

Trying to understand (new to kafka)how the poll event loop in kafka works.

用例:有关该主题的25条记录,最大民意调查大小设置为5.

Use Case : 25 records on the topic, max poll size is set to 5.

max.poll.interval.ms = 5000 //5 seconds by default max.poll.records = 5

任务顺序

  1. 从主题中轮询记录.
  2. 在for循环中处理记录.
  3. 某些处理登录,逻辑将通过或失败.
  4. 如果逻辑传递(具有偏移量)将被添加到地图中.
  5. 然后将使用commitSync调用将其提交.
  6. 如果失败,则循环将中断,并且成功之前将执行任何成功操作.问题将在此之后开始.
  7. 即使发生错误,下一次民意调查也会以5批为单位继续移动吗?
  1. Poll the records from the topic.
  2. Process the records in a for loop.
  3. Some processing login where the logic would either pass or fail.
  4. If logic passes (with offset) will be added to a map.
  5. Then it will be committed using commitSync call.
  6. If fails then the loop will break and whatever was success before this would be committed.The problem starts after this.
  7. The next poll would just keep moving in batches of 5 even after error, is it expected?

我们基本上期望的是循环中断和偏移量,直到成功处理消息逻辑提交为止,然后从失败消息开始下一次轮询.

What we basically expect is that the loop breaks and the offsets till success process message logic should get committed, then the next poll should continue from the failed message.

例如,第一批民意调查5条消息被轮询,并且1,2个偏移量成功提交,然后第三次失败,因此,如果我们预期之间有任何错误,则民意调查呼叫将继续移至下一批,如5-10,10-15它在该点停止,并且在第一种情况下轮询应从3开始,或者如果第二批在8处失败,则下一个轮询应从第8个偏移开始,而不是下一个最大轮询批处理设置(在这种情况下为5).使用SPRING BOOT PROJECT并启用自动提交功能是错误的.

Example, 1st batch of poll 5 messages polled and 1,2 offsets successful and committed then 3rd failed.So the poll call keep moving to next batch like 5-10,10-15 if there are any errors in between we expect it to stop at that point and poll should start from 3 in first case or if it fails in 2nd batch at 8 then the next poll should start from 8th offset not from next max poll batch settings which would be like 5 in this case.IF IT MATTERS USING SPRING BOOT PROJECT and enable autocommit is false.

我尝试在文档中找到此内容,但没有帮助.

I have tried finding this in documentation but no help.

尝试对此进行了调整,但没有帮助max.poll.interval.ms

tried tweaking this but no help max.poll.interval.ms

不接受答案,因为没有针对客户消费者的直接解决方案.保留此信息仅供参考

推荐答案

在发生故障时,您可以手动搜索所有分配的分区的轮询开始偏移.我不确定使用spring消费者.

You can manually seek to the beginning offset of the poll for all the assigned partitions on failure. I am not sure using spring consumer.

示例代码,用于查找普通使用者到起始位置的偏移量. 在下面的代码中,我将获取每个分区的记录列表,然后获取要查找的第一条记录的偏移量.

Sample code for seeking offset to beginning for normal consumer. In the code below I am getting the records list per partition and then getting the offset of the first record to seek to.

def seekBack(records: ConsumerRecords[String, String]) = {
    records.partitions().map(partition => {
      val partitionedRecords = records.records(partition)
      val offset = partitionedRecords.get(0).offset()
      consumer.seek(partition, offset)
    })
  }

在生产环境中执行此操作的一个问题是很糟糕的,因为您不希望在出现暂时性错误的情况下一直都寻求回溯,否则最终将无限重试.

One problem doing this in production is bad since you don't want seekback all the time only in cases where you have a transient error otherwise you will end up retrying infinitely.

这篇关于KafkaConsumer poll()行为理解的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆