Spring Kafka-消费任何主题的分区的最后N条消息 [英] Spring Kafka - Consume last N messages for partitions(s) for any topic

查看:277
本文介绍了Spring Kafka-消费任何主题的分区的最后N条消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试读取所请求的无Kafka消息. 对于非事务性消息,我们将从endoffset中寻求-M个分区的N个开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息.对于幂等/事务性消息,我们必须考虑事务标记/重复消息,这意味着偏移不会是连续的,在这种情况下,endoffset-N将不会返回N条消息,我们需要返回并寻找更多消息,直到我们收到N条消息为止每个分区或达到起始偏移量

I'm trying to read the requested no of kafka messages. For non transactional messages we would seek from endoffset - N for M partitions start polling and collect messages where current offset is less than end offset for each partitions. For idempotent/transactional messages we have to account for transaction markers/duplicate messages and meaning offsets will not be continuous, in such case endoffset - N will not return N messages and we would need go back and seek for more messages until we have N messages for each partitions or beginning offset is reached

由于存在多个分区,因此我需要跟踪读取的所有偏移量,以便在完成所有操作后就可以停止.有两个步骤,第一步是计算开始偏移量(结束偏移量-请求的消息数量)和结束偏移量. (偏移量是不连续的,存在间隙),我将寻求从起始偏移量开始消耗的分区.第二步是轮询消息并计数每个分区中的消息,如果不满足请求的消息数,请再次重复第一步和第二步,直到遇到每个分区的消息数为止.

As there are multiple partitions I would need to keep track of all the offsets read so I can stop when all is done. There are two steps, first step to calculate the the start offset (end offset - requested no of messages) and end offset. ( the offsets are not continuous there are gaps) and I would seek the partition to start consuming from start offset. Second step is to poll the messages and count the messages in each partitions and if we don't meet the requested no of messages repeat first and second step again until we met the no of messages for each partition.

条件

初始轮询可能不会返回任何记录,因此请继续轮询. 当达到每个分区的结束偏移量时停止轮询,否则轮询不返回任何结果. 检查每个分区,以读取与请求的消息相同的消息.如果是,则标记为完成,如果没有,则标记为继续并重复步骤.弥补消息中的空白. 应该适用于交易和非交易生产者.

Initial poll may not return any records so continue polling. Stop polling when you have reached the end offset for each partition or poll returns no results. Check each partition for messages read same as messages requested. If yes mark as complete, if no mark as continue and repeat steps. Account for gaps in messages. Should work for both transactional and non transactional producer.

问题:

我将如何跟踪每个分区已读取的所有消息并退出循环?如果有帮助,每个分区中的消息将按顺序排列.

How would I go about keeping track of all the messages have been read for each partition and break out of loop ? Messages in each partition will come in order if it is helpful.

spring kafka是否支持这种用例?可以在此处

Does spring kafka support such use case ? More details can be found here

更新:我要读取每个分区中的最后N条消息.分区而不是消息是用户输入.我想将所有偏移量管理保留在内存中.本质上,我们试图按LIFO顺序读取消息.由于Kafka允许您向前阅读而不是向后阅读,因此这很棘手.

Update: I'm asking to read last N messages in each partition. Partitions and no of messages is the user input. I would like to keep all the offset management in the memory. In essence we are trying to read the messages in the LIFO order. This makes it tricky as Kafka allows you to read forward not backward.

推荐答案

因此,如果我对您的理解正确,那么应该可以使用标准的Kafka Consumer.

So if I understand you correctly, this should be doable with a standard Kafka Consumer.

Consumer<?, Message> consumer = ...

public Map<Integer, List<Message>> readLatestFromPartitions(String topic, Collection<Integer> partitions, int count) {

    // create the TopicPartitions we want to read
    List<TopicPartition> tps = partitions.stream().map(p -> new TopicPartition(topic, p)).collect(toList());
    consumer.assign(tps);

    // create and initialize the result map
    Map<Integer, List<Message>> result = new HashMap<>();
    for (Integer i : partitions) { result.add(new ArrayList<>()); }

    // read until the expected count has been read for all partitions
    while (result.valueSet().stream().findAny(l -> l.size() < count)) {
        // read until the end of the topic
        ConsumerRecords<?, Message> records = consumer.poll(Duration.ofSeconds(5));
        while (records.count() > 0) {
            Iterator<ConsumerRecord<?, Message>> recordIterator = records.iterator();
            while (recordIterator.hasNext()) {
                ConsumerRecord<?, Message> record = recordIterator.next();
                List<Message> addTo = result.get(record.partition);
                // only allow 10 entries per partition
                if (addTo.size() >= count) {
                    addTo.remove(0);
                }
                addTo.add(record.value);
            }
            records = consumer.poll(Duration.ofSeconds(5));
        }
        // now we have read the whole topic for the given partitions.
        // if all lists contain the expected count, the loop will finish;
        // otherwise it will wait for more data to arrive.
    }

    // the map now contains the messages in the order they were sent,
    // we want them reversed (LIFO)
    Map<Integer, List<Message>> returnValue = new HashMap<>();
    result.forEach((k, v) -> returnValue.put(k, Collections.reverse(v)));
    return returnValue;
}

这篇关于Spring Kafka-消费任何主题的分区的最后N条消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆