如何在Spring-Kafka中实现Consumer SeekAware? [英] How to implement ConsumerSeekAware in Spring-kafka?

查看:19
本文介绍了如何在Spring-Kafka中实现Consumer SeekAware?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试用@KafkaListener实现一个消费者。 我使用的是Spring2.3.7版本。

以下是我到目前为止的代码

public class SampleListener {

@KafkaListener(topics = "test-topic",
        containerFactory = "sampleKafkaListenerContainerFactory",
        groupId = "test-group")
public void onMessage(@Payload String message,
                              @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                              @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                              @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long receivedTimestamp,
                              @Header(KafkaHeaders.OFFSET) long offset,
                              @Headers MessageHeaders messageHeaders) {

    LOGGER.info("Received Message for topic={} partition={} offset={} messageHeaders={}",
            topic, partition, offset, messageHeaders);
    LOGGER.debug("Received Message payload={}", message);
    doSomething(message);

   }
}

我是卡夫卡和春天的新手。我读了Spring-Kafka关于如何寻找偏移量的文档,但无法完全理解。

据我的理解,对于我的用例,当分区分配给容器或在任何其他场景中(确保只读一次)时,我不想再次读取事件。

我看到大多数Consumer实现都实现了ConsumerSeekAware。我知道实现ConsumerSeekAware使我们能够在onIdleContaineronPartitionsAssigned之类的事件上寻找偏移量。我不能理解这些正在处理的方案是什么?

  1. ConsumerSeekAware实现处理哪些场景?实施Kafka Consumer需要寻求补偿的最佳实践或一般场景是什么?

  2. registerSeekCallbackonPartitionsAssigned有什么区别?对于这两种情况,它说每当分配分区时都会调用它们。这两种方法的回调有什么不同?

推荐答案

实现ConsumerSeekAware允许您

a.在初始化期间查找特定的偏移量(或开始、结束或由时间戳表示的偏移量)。

b.Perform在应用程序的生命周期内随时进行查找。

首选技术是在可能的情况下扩展AbstractConsumerSeekAware,因为它处理了大部分基础复杂性。

如果不需要查找,则不需要实现接口(或扩展抽象类)。

据我的理解,对于我的用例,当分区分配给容器或在任何其他场景中(确保只读一次)时,我不想再次读取事件。

容器会自动为您提交偏移量(默认情况下,当poll()返回所有记录时,但您可以将容器AckMode属性设置为RECORD,以便在处理每条记录后提交偏移量)。

下次启动应用程序时,它将从上次提交的偏移量开始消耗。

2.

onPartitionsAssigned在分配分区时调用(最初或在重新平衡之后)。如果您在那里执行查找,它们将在重新平衡期间直接调用使用者。

registerSeekCallback被调用来为应用程序提供一个回调句柄,该回调可以在将来的任何时间被调用。如果容器有并发>;1,则注册多个回调。当您对这些回调执行查找时,它们将排队等待使用者线程在下一次轮询之前调用。(使用者不是线程安全的)。抽象类为您管理这一点,并允许更高级别的抽象...

/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
    getSeekCallbacks()
        .forEach((tp, callback) ->
            callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}

/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
    getSeekCallbackFor(new org.apache.kafka.common.TopicPartition(topic, partition))
        .seekRelative(topic, partition, -1, true);
}

在即将发布的2.6.0版本(定于本周发布)中,使用seekToBeginning()seekToEnd()seekToTimeStamp()方法会更简单,它们将对所有分配的分区进行排队搜索。

这篇关于如何在Spring-Kafka中实现Consumer SeekAware?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆