Spring kafka 消费者,在运行时寻求偏移量? [英] Spring kafka consumer, seek offset at runtime?

查看:48
本文介绍了Spring kafka 消费者,在运行时寻求偏移量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 KafkaMessageListenerContainer 从 kafka 主题中进行消费,我有一个应用程序逻辑来处理每个依赖于其他微服务的记录.我现在在处理每条记录后手动提交偏移量.

I am using the KafkaMessageListenerContainer for consuming from the kafka topic, I have an application logic to process each record which is dependent on other micro services as well. I am now manually committing the offset after each record is processed.

但是如果我的应用程序逻辑失败,我需要寻找失败的偏移量并继续处理它直到它成功.为此,我需要对最后一个偏移量进行运行时手动查找.

But if I the application logic fails I need to seek to the failed offset and keep processing it until it's succeeds. For that I need to do a run time manual seek of the last offset.

KafkaMessageListenerContainer 是否可以做到这一点?

Is this possible with the KafkaMessageListenerContainer yet ?

推荐答案

参见 寻求特定偏移.

为了进行搜索,您的侦听器必须实现 ConsumerSeekAware,它具有以下方法:

In order to seek, your listener must implement ConsumerSeekAware which has the following methods:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map assignments, ConsumerSeekCallback callback);

容器启动时调用第一个;在初始化后的某个任意时间进行查找时,应使用此回调.您应该保存对回调的引用;如果您在多个容器(或 ConcurrentMessageListenerContainer)中使用相同的侦听器,则应将回调存储在 ThreadLocal 或由侦听器 Thread 键控的其他结构中.>

The first is called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in a ConcurrentMessageListenerContainer) you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.

这篇关于Spring kafka 消费者,在运行时寻求偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆