Spring Kafka使用者,在运行时寻求偏移量? [英] Spring kafka consumer, seek offset at runtime?

查看:181
本文介绍了Spring Kafka使用者,在运行时寻求偏移量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是Kafka主题中的KafkaMessageListenerContainer,我有一个应用程序逻辑来处理每条记录,这些记录也依赖于其他微服务.我现在在处理每条记录后手动提交偏移量.

I am using the KafkaMessageListenerContainer for consuming from the kafka topic, I have an application logic to process each record which is dependent on other micro services as well. I am now manually committing the offset after each record is processed.

但是,如果我的应用程序逻辑失败,则需要寻找失败的偏移量并继续对其进行处理,直到成功为止.为此,我需要对最后一个偏移量进行运行时手动搜索.

But if I the application logic fails I need to seek to the failed offset and keep processing it until it's succeeds. For that I need to do a run time manual seek of the last offset.

使用KafkaMessageListenerContainer还可以吗?

Is this possible with the KafkaMessageListenerContainer yet ?

推荐答案

请参见

为了进行搜索,您的侦听器必须实现具有以下方法的ConsumerSeekAware:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

第一个在容器启动时被调用;在初始化后的任意时间搜索时,应使用此回调.您应该保存对回调的引用;如果在多个容器(或ConcurrentMessageListenerContainer)中使用同一侦听器,则应将回调存储在ThreadLocal或由侦听器线程键控的其他结构中.

The first is called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in a ConcurrentMessageListenerContainer) you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.

这篇关于Spring Kafka使用者,在运行时寻求偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆