如何在 Kafka 0.10 中找到主题分区的偏移范围? [英] How to find the offset range for a topic-partition in Kafka 0.10?

查看:22
本文介绍了如何在 Kafka 0.10 中找到主题分区的偏移范围?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 Kafka 0.10.0.在处理之前,我想知道一个分区中记录的大小.

I'm using Kafka 0.10.0. Before processing, I want to know the size of the records in a partition.

在 0.9.0.1 版本中,我曾经通过使用以下代码来查找分区的 latestearliest 偏移量之间的差异.新版本在检索consumer#position方法时卡住了.

In 0.9.0.1 version, I used to find the difference between latest and earliest offset for a partition by using the below code. In the new version, it gets stuck when retrieving the consumer#position method.

package org.apache.kafka.example.utils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.lang3.Range;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class FindTopicRange {

    private static Logger logger = LogManager.getLogger();

    public FindTopicRange() {
        // TODO Auto-generated constructor stub
    }

    public static Map<TopicPartition, Range<Long>> getOffsets(String topic) {

        Map<TopicPartition, Range<Long>> partitionToRange = new HashMap<>();
        try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(getConsumerConfigs())) {

            List<TopicPartition> partitions = new ArrayList<>();
            for (PartitionInfo partitionInfo : consumer.partitionsFor(topic)) {
                partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            }
            consumer.assign(partitions);

            for (TopicPartition partition : partitions) {
                consumer.seekToBeginning(Collections.singletonList(partition));
                long earliestOffset = consumer.position(partition);

                consumer.seekToEnd(Collections.singletonList(partition));
                long latestOffset = consumer.position(partition);
                partitionToRange.put(partition, Range.between(earliestOffset, latestOffset));
            }
            return partitionToRange;
        } catch (Exception e) {
            logger.error("Exception while getting offset range information for topic : {}", topic, e);
        }
        return partitionToRange;
    }

    private static Properties getConsumerConfigs() {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
        configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10240);

        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
        return configs;
    }

    public static void main(String[] args) {
        System.out.println(getOffsets("hello"));
    }

}

上述调用的堆栈跟踪如下所示:

Stacktrace for the above call is shown below:

"main" prio=10 tid=0x00007f1750013800 nid=0x443 runnable [0x00007f1756b88000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
        - locked <0x00000007c21cba00> (a sun.nio.ch.Util$2)
        - locked <0x00000007c21cb9f0> (a java.util.Collections$UnmodifiableSet)
        - locked <0x00000007c21cb8d8> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
        at org.apache.kafka.common.network.Selector.select(Selector.java:454)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:324)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:306)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1405)
        at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1196)

推荐答案

我能够使您的示例在 Scala 中工作(已经在处理类似的代码).考虑到 consumer.subscribe 和 consumer.assign 都是懒惰的,我所做的唯一添加是在代码中添加了一个 consumer.poll.

I was able to make your example work in scala (was already working on similar code). The only addition that I made was adding a consumer.poll to the code given that both consumer.subscribe and consumer.assign are lazy.

val partitions = new util.ArrayList[TopicPartition]

for (partitionInfo <- consumer.partitionsFor(topic)) {
 partitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))}

 val recordTemp = consumer.poll(1000)

 for (partition <- partitions) {
    consumer.seekToBeginning(Collections.singletonList(partition))
    println(consumer.position(partition))
    consumer.seekToEnd(Collections.singletonList(partition))
    println(consumer.position(partition))
 }

这篇关于如何在 Kafka 0.10 中找到主题分区的偏移范围?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆