寻找春天的时间戳-卡夫卡 [英] seekToTimestamp in spring-kafka

查看:40
本文介绍了寻找春天的时间戳-卡夫卡的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试寻找时间戳功能,但由于某些原因,它不适合我。

在我的Producer中,我有下一个代码:

ProducerRecord<String, Obj> producer = new ProducerRecord<>("topic", 0, System.currentTimeMillis() - 10000, "key", obj);
kafkaTemplate.send(producer);

在我的卡夫卡监听器中,我试图寻找比上面的时间戳更高的时间戳的偏移量:

@Component
@RequiredArgsConstructor
@KafkaListener(id = "container",
        topics = "topic",
        clientIdPrefix = "init_client",
        autoStartup = "true")
public class KafkaList implements ConsumerSeekAware {
   @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
                long timestamp = System.currentTimeMillis()+60*1000;
        log.info("Search for a time that is great or equal then {}", timestamp);
        callback.seekToTimestamp(new ArrayList<>(assignments.keySet()), timestamp);

    }

    @KafkaHandler
    public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, Obj obj,
                       @Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp) {
        log.info("Received message timestamp: {}, date: {}", timestamp,
                Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()).toLocalDate());
    }
}

在日志中,我看到下一个输出:

Search for a time that is great or equal then 1613079865328
Received message timestamp: 1613079798676, date: 2021-02-11

卡夫卡主题1613079798676中的时间戳值低于我的搜索值1613079865328,为什么消费者会选择这个偏移量?

推荐答案

我刚刚用您的代码测试了它,它对我来说工作正常;我在日志中看到了这一点...

2021-02-16 11:30:16.587信息36721-[o66163492-0-C-1]com.example.demo.So66163492应用程序:搜索大于或等于1613493076587的时间

2021-02-16 11:30:16.590信息36721-[o66163492-0-C-1]o.a.k.clients.Consumer:[Consumer ClientID=Consumer-so66163492-1,groupID=so66163492]正在寻找分区so66163492-0的偏移量1

2021-02-16 11:30:16.590信息36721-[o66163492-0-C-1]o.s.k.l.KafkaMessageListenerContainer:so66163492:已分配分区:[so66163492-0]

2021-02-16 11:30:16.611信息36721-[o66163492-0-C-1]com.example.demo.So66163492应用程序:收到消息时间戳:1613529016472 qux

@SpringBootApplication
public class So66163492Application extends AbstractConsumerSeekAware {

    private static final Logger log = LoggerFactory.getLogger(So66163492Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So66163492Application.class, args);
    }

    @KafkaListener(id = "so66163492", topics = "so66163492")
    public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String obj,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp) {

        log.info("Received message timestamp: {} {}", timestamp, obj);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so66163492").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send(new ProducerRecord<>("so66163492", 0, System.currentTimeMillis() - 10_000, "foo", "bar"));
            template.send(new ProducerRecord<>("so66163492", 0, System.currentTimeMillis() + 36_000_000, "baz", "qux"));
        };
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        long timestamp = System.currentTimeMillis() + 60 * 1000;
        log.info("Search for a time that is great or equal then {}", timestamp);
        callback.seekToTimestamp(new ArrayList<>(assignments.keySet()), timestamp);
    }

}

这篇关于寻找春天的时间戳-卡夫卡的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆