如何获得KafkaListener消费者 [英] how to get KafkaListener Consumer

查看:20
本文介绍了如何获得KafkaListener消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用SpringBoot 2.0.4.RELEASE的Spring-Kafka。

并使用KafkaListener获取消息

现在我要重置我的组的偏移量

但我不知道如何获得群的消费者

    @KafkaListener(id="test",topics={"test"},groupId="group",containerFactory="batchContainerFactory")
    public String listenTopic33(List<ConsumerRecord<Integer, String>> record, Acknowledgment ack){
// do something

    }


    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    public void test() {

        MessageListenerContainer test3 = kafkaListenerEndpointRegistry.getListenerContainer("test3");
}

推荐答案

如果要在侦听器本身中查找使用者,只需向侦听器方法添加Consumer<?, ?> consumer参数。

请记住,容器可能已获取更多消息,因此您将在查找生效之前获得它们。您可以设置max.poll.records=1以避免这种情况。

您还可以向容器添加自定义RemainingRecordsErrorHandler,在侦听器中引发异常,错误处理程序将获得剩余的记录,而不是侦听器。

另请参阅Seeking to a Specific Offset

为了查找,监听器必须实现Consumer SeekAware,它有以下方法:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

第一个方法在容器启动时调用。当在初始化后的某个任意时间进行搜索时,您应该使用此回调。您应该保存对回调的引用。如果在多个容器中(或在ConCurentMessageListenerContainer中)使用相同的侦听器,则应将回调存储在由侦听器线程设置键的ThreadLocal或其他结构中。

使用组管理时,第二个方法在任务更改时调用。例如,您可以使用此方法通过调用回调来设置分区的初始偏移量。您必须使用回调参数,而不是传递到RegisterSeekCallback中的参数。如果您自己显式分配分区,则永远不会调用此方法。在这种情况下使用TopicPartitionInitialOffset。

回调方法如下:

void seek(String topic, int partition, long offset);

void seekToBeginning(String topic, int partition);

void seekToEnd(String topic, int partition);

您还可以在检测到空闲容器时从onIdleContainer()执行查找操作。有关如何启用空闲容器检测的信息,请参阅检测空闲和非响应使用者。

若要在运行时任意查找,请使用相应线程的RegisterSeekCallback中的回调引用。

这里有一个例子;我们跟踪每个主题/分区的回调...

@SpringBootApplication
public class So56584233Application {

    public static void main(String[] args) {
        SpringApplication.run(So56584233Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send(new ProducerRecord<>("so56584233", i % 3, "foo", "bar")));
            while (true) {
                System.in.read();
                listener.seekToStart();
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so56584233", 3, (short) 1);
    }

}

@Component
class Listener implements ConsumerSeekAware {


    private static final Logger logger = LoggerFactory.getLogger(Listener.class);


    private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

    private static final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        callbackForThread.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> this.callbacks.put(tp, callbackForThread.get()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }

    @KafkaListener(id = "so56584233", topics = "so56584233", concurrency = "3")
    public void listen(ConsumerRecord<String, String> in) {
        logger.info(in.toString());
    }

    public void seekToStart() {
        this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

}

这篇关于如何获得KafkaListener消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆