即使在 Kafka 中进行轮询后,也不会发生当前的分区分配 [英] No current assignment for partition occurs even after poll in Kafka
问题描述
我有使用 Apache Kafka 2.11-0.10.1.0 的 Java 8 应用程序.我需要使用 seek
功能来poll
来自分区的旧消息.但是,我遇到了 No current assignment for partition
的异常,每次我尝试 seekByOffset
时都会发生这种情况.这是我的班级,负责seek
主题到指定的时间戳:
I have Java 8 application working with Apache Kafka 2.11-0.10.1.0. I need to use the seek
feature to poll
old messages from partitions. However I faced an exception of No current assignment for partition
which is occurred every time I am trying to seekByOffset
. Here's my class which is responsible for seek
ing topics to the specified timestamp:
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* The main purpose of this class is to move fetching point for each partition of the {@link KafkaConsumer}
* to some offset which is determined either by timestamp or by offset number.
*/
public class KafkaSeeker {
public static final long APP_STARTUP_TIME = Instant.now().toEpochMilli();
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
private final KafkaConsumer<String, String> kafkaConsumer;
private ConsumerRecords<String, String> polledRecords;
public KafkaSeeker(KafkaConsumer<String, String> kafkaConsumer) {
this.kafkaConsumer = kafkaConsumer;
this.polledRecords = new ConsumerRecords<>(Collections.emptyMap());
}
/**
* For each assigned or subscribed topic {@link org.apache.kafka.clients.consumer.KafkaConsumer#seek(TopicPartition, long)}
* fetching pointer to the specified {@code timestamp}.
* If no messages were found in each partition for a topic,
* then {@link org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd(Collection)} will be called.
*
* Due to {@link KafkaConsumer#subscribe(Pattern)} and {@link KafkaConsumer#assign(Collection)} laziness
* method needs to execute dummy {@link KafkaConsumer#poll(long)} method. All {@link ConsumerRecords} which were
* polled from buffer are swallowed and produce warning logs.
*
* @param timestamp is used to find proper offset to seek to
* @param topics are used to seek only specific topics. If not specified or empty, all subscribed topics are used.
*/
public Map<TopicPartition, OffsetAndTimestamp> seek(long timestamp, Collection<String> topics) {
this.polledRecords = kafkaConsumer.poll(0);
Collection<TopicPartition> topicPartitions;
if (CollectionUtils.isEmpty(topics)) {
topicPartitions = kafkaConsumer.assignment();
} else {
topicPartitions = topics.stream()
.map(it -> {
List<Integer> partitions = kafkaConsumer.partitionsFor(it).stream()
.map(PartitionInfo::partition).collect(Collectors.toList());
return partitions.stream().map(partition -> new TopicPartition(it, partition));
})
.flatMap(it -> it)
.collect(Collectors.toList());
}
if (topicPartitions.isEmpty()) {
throw new IllegalStateException("Kafka consumer doesn't have any subscribed topics.");
}
Map<TopicPartition, Long> timestampsByTopicPartitions = topicPartitions.stream()
.collect(Collectors.toMap(Function.identity(), topicPartition -> timestamp));
Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampsByTopicPartitions);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
TopicPartition topicPartition = entry.getKey();
if (entry.getValue() != null) {
LOGGER.info("Kafka seek topic:partition [{}:{}] from [{} offset] to [{} offset].",
topicPartition.topic(),
topicPartition.partition(),
beginningOffsets.get(topicPartition),
entry.getValue());
kafkaConsumer.seek(topicPartition, entry.getValue().offset());
} else {
LOGGER.info("Kafka seek topic:partition [{}:{}] from [{} offset] to the end of partition.",
topicPartition.topic(),
topicPartition.partition());
kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
}
}
return offsets;
}
public ConsumerRecords<String, String> getPolledRecords() {
return polledRecords;
}
}
在调用该方法之前,我让消费者订阅了一个像这样的主题 consumer.subscribe(singletonList(kafkaTopic));
.当我得到 kafkaConsumer.assignment()
时,它会返回零个 TopicPartition
分配.但是,如果我指定主题并获取其分区,那么我就有了有效的 TopicPartition
,尽管它们在 seek
调用中失败并在标题中出现错误.我忘记了什么?
Before calling the method I have consumer subscribed to a single topic like this consumer.subscribe(singletonList(kafkaTopic));
. When I get kafkaConsumer.assignment()
it returns zero TopicPartition
s assigned. But if I specify the topic and get its partitions then I have valid TopicPartition
s, although they are failing on seek
call with the error in the title. What is something I forgot?
推荐答案
可靠地查找和检查当前分配的正确方法是在订阅后等待 onPartitionsAssigned()
回调.在新创建(仍未连接)的消费者上,调用 poll()
一次并不能保证它会立即连接并分配分区.
The correct way to reliably seek and check current assignment is to wait for the onPartitionsAssigned()
callback after subscribing. On a newly created (still not connected) consumer, calling poll()
once does not guarantees it will immedaitely be connected and assigned partitions.
作为一个基本示例,请参阅下面的代码,它订阅了一个主题,并在分配的回调中寻找所需的位置.最后,您会注意到轮询循环正确地只看到来自搜索位置的记录,而不是来自先前提交或重置的偏移量.
As a basic example, see the code below that subscribes to a topic, and in the assigned callback, seeks to the desired position. Finally you'll notice that the poll loop correctly only sees records from the seek location and not from the previous committed or reset offset.
public static final Map<TopicPartition, Long> offsets = Map.of(new TopicPartition("testtopic", 0), 5L);
public static void main(String args[]) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("testtopic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Assigned " + partitions);
for (TopicPartition tp : partitions) {
OffsetAndMetadata oam = consumer.committed(tp);
if (oam != null) {
System.out.println("Current offset is " + oam.offset());
} else {
System.out.println("No committed offsets");
}
Long offset = offsets.get(tp);
if (offset != null) {
System.out.println("Seeking to " + offset);
consumer.seek(tp, offset);
}
}
}
});
for (int i = 0; i < 10; i++) {
System.out.println("Calling poll");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100L));
for (ConsumerRecord<String, String> r : records) {
System.out.println("record from " + r.topic() + "-" + r.partition() + " at offset " + r.offset());
}
}
}
}
这篇关于即使在 Kafka 中进行轮询后,也不会发生当前的分区分配的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!