计算增量偏移量 Kafka Java [英] Calculate delta Offsets Kafka Java

查看:55
本文介绍了计算增量偏移量 Kafka Java的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在一个 spring 项目中,我使用了 Kafka,现在我想制作一个采用TopicName"的方法.和GroupeId"作为参数并计算主题分区的最后偏移量"之间的差异;和组消耗的偏移量"

In a spring project i used Kafka and now I want to make a method which takes "TopicName" and "GroupeId" as parameters and calculate the difference between "Lastoffsets of the topic partitions" and the "offsets consumed by the group"

对于 lastOffsets 我明白了现在我需要获取消耗的偏移量来计算差异

for the lastOffsets i get it now i need to get the consumed offsets to calculate the difference

public ResponseEntity<Offsets> deltaoffsets (@RequestParam( name = "groupId") String groupId, @RequestParam( name = "topic") String topic) {
        Map<String,Object> properties = (Map) kafkaLocalConsumerConfig.get("kafkaLocalConsumerConfig");
        properties.put("group.id", groupId);
        properties.put("enable.auto.commit", "true");
        List<TopicPartition> partition=new ArrayList<>();

        KafkaConsumer<String, RefentialToReload> kafkaLocalConsumer = new KafkaConsumer<>(properties);

        Map<String, List<PartitionInfo>> topics = kafkaLocalConsumer.listTopics();
        List<PartitionInfo> partitionInfos = topics.get(topic);

        if (partitionInfos == null) {
            log.warn("Partition information was not found for topic");
        }

    else {
            for (PartitionInfo partitionInfo : partitionInfos) {
                TopicPartition topicPartition = new TopicPartition(topic, partitionInfo.partition());
                partition.add(topicPartition);
                log.info("partition assigned to kafkaLocalConsumer");
            }
        }
        //get lastOffsets of the topicPartition
        Map<TopicPartition,Long> OffsetsTopicpartition = kafkaLocalConsumer.endOffsets(kafkaLocalConsumer.assignment());
        //here i need to get consumed offsets 
}

推荐答案

beginningOffsets() 是第一个偏移量,而不是最后一个.

beginningOffsets() is the first offsets, not the last.

您可以使用 AdminClient - 这是一个显示当前和结束偏移量的示例...

You can use an AdminClient - here is an example that displays the current and end offsets...

@Bean
public ApplicationRunner runner(KafkaAdmin admin, ConsumerFactory<String, String> cf) throws Exception {
    return args -> {
    try (
            AdminClient client = AdminClient.create(admin.getConfig());
            Consumer<String, String> consumer = cf.createConsumer("group", "clientId", "");
        ) {
        Collection<ConsumerGroupListing> groups = client.listConsumerGroups()
                .all()
                .get(10, TimeUnit.SECONDS);
        groups.forEach(group -> {
            Map<TopicPartition, OffsetAndMetadata> map = null;
            try {
                map = client.listConsumerGroupOffsets(group.groupId())
                        .partitionsToOffsetAndMetadata()
                        .get(10, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
            catch (ExecutionException e) {
                e.printStackTrace();
            }
            catch (TimeoutException e) {
                e.printStackTrace();
            }
            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(map.keySet());
            map.forEach((tp, off) -> {
                System.out.println("group: " + group + " tp: " + tp
                        + " current offset: " + off.offset()
                        + " end offset: " + endOffsets.get(tp));
            });
        });
    }
    };
}

这篇关于计算增量偏移量 Kafka Java的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆