Spring Kafka - 为任何主题的分区消耗最后 N 条消息 [英] Spring Kafka - Consume last N messages for partitions(s) for any topic

查看:25
本文介绍了Spring Kafka - 为任何主题的分区消耗最后 N 条消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试读取请求的 kafka 消息数.对于非事务性消息,我们将从 endoffset 中寻找 - M 个分区的 N 开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息.对于幂等/事务消息,我们必须考虑事务标记/重复消息,这意味着偏移量将不连续,在这种情况下,endoffset - N 不会返回 N 条消息,我们需要返回并寻找更多消息,直到我们有 N 条消息对于每个分区或达到起始偏移量

I'm trying to read the requested no of kafka messages. For non transactional messages we would seek from endoffset - N for M partitions start polling and collect messages where current offset is less than end offset for each partitions. For idempotent/transactional messages we have to account for transaction markers/duplicate messages and meaning offsets will not be continuous, in such case endoffset - N will not return N messages and we would need go back and seek for more messages until we have N messages for each partitions or beginning offset is reached

由于有多个分区,我需要跟踪读取的所有偏移量,以便在完成所有操作后停止.有两个步骤,第一步计算起始偏移量(结束偏移量 - 请求的消息数)和结束偏移量.(偏移量不连续,存在间隙),我会寻找分区从起始偏移量开始消耗.第二步是轮询消息并对每个分区中的消息进行计数,如果我们不满足请求的消息数,则再次重复第一步和第二步,直到我们满足每个分区的消息数.

As there are multiple partitions I would need to keep track of all the offsets read so I can stop when all is done. There are two steps, first step to calculate the the start offset (end offset - requested no of messages) and end offset. ( the offsets are not continuous there are gaps) and I would seek the partition to start consuming from start offset. Second step is to poll the messages and count the messages in each partitions and if we don't meet the requested no of messages repeat first and second step again until we met the no of messages for each partition.

条件

初始轮询可能不会返回任何记录,因此请继续轮询.当您达到每个分区的结束偏移或轮询不返回结果时停止轮询.检查每个分区读取的消息与请求的消息相同.如果是,标记为完成,如果没有标记为继续并重复步骤.考虑消息中的差距.应该适用于事务性和非事务性生产者.

Initial poll may not return any records so continue polling. Stop polling when you have reached the end offset for each partition or poll returns no results. Check each partition for messages read same as messages requested. If yes mark as complete, if no mark as continue and repeat steps. Account for gaps in messages. Should work for both transactional and non transactional producer.

问题:

我将如何跟踪每个分区已读取的所有消息并跳出循环?如果有帮助,每个分区中的消息会按顺序排列.

How would I go about keeping track of all the messages have been read for each partition and break out of loop ? Messages in each partition will come in order if it is helpful.

spring kafka 是否支持这样的用例?可以在此处

Does spring kafka support such use case ? More details can be found here

更新:我要求读取每个分区中的最后 N 条消息.分区和没有消息是用户输入.我想将所有偏移管理保留在内存中.本质上,我们试图以 LIFO 顺序读取消息.这使它变得棘手,因为 Kafka 允许您向前阅读而不是向后阅读.

Update: I'm asking to read last N messages in each partition. Partitions and no of messages is the user input. I would like to keep all the offset management in the memory. In essence we are trying to read the messages in the LIFO order. This makes it tricky as Kafka allows you to read forward not backward.

推荐答案

所以如果我理解正确的话,这对于标准的 Kafka Consumer 应该是可行的.

So if I understand you correctly, this should be doable with a standard Kafka Consumer.

Consumer<?, Message> consumer = ...

public Map<Integer, List<Message>> readLatestFromPartitions(String topic, Collection<Integer> partitions, int count) {

    // create the TopicPartitions we want to read
    List<TopicPartition> tps = partitions.stream().map(p -> new TopicPartition(topic, p)).collect(toList());
    consumer.assign(tps);

    // create and initialize the result map
    Map<Integer, List<Message>> result = new HashMap<>();
    for (Integer i : partitions) { result.add(new ArrayList<>()); }

    // read until the expected count has been read for all partitions
    while (result.valueSet().stream().findAny(l -> l.size() < count)) {
        // read until the end of the topic
        ConsumerRecords<?, Message> records = consumer.poll(Duration.ofSeconds(5));
        while (records.count() > 0) {
            Iterator<ConsumerRecord<?, Message>> recordIterator = records.iterator();
            while (recordIterator.hasNext()) {
                ConsumerRecord<?, Message> record = recordIterator.next();
                List<Message> addTo = result.get(record.partition);
                // only allow 10 entries per partition
                if (addTo.size() >= count) {
                    addTo.remove(0);
                }
                addTo.add(record.value);
            }
            records = consumer.poll(Duration.ofSeconds(5));
        }
        // now we have read the whole topic for the given partitions.
        // if all lists contain the expected count, the loop will finish;
        // otherwise it will wait for more data to arrive.
    }

    // the map now contains the messages in the order they were sent,
    // we want them reversed (LIFO)
    Map<Integer, List<Message>> returnValue = new HashMap<>();
    result.forEach((k, v) -> returnValue.put(k, Collections.reverse(v)));
    return returnValue;
}

这篇关于Spring Kafka - 为任何主题的分区消耗最后 N 条消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆