Kafka消费者offsetForTimes方法仅返回几个分区偏移量位置,而不是全部 [英] Kafka consumer offsetForTimes method returns only few partitions offsets position not all
问题描述
我有一个带有8个分区的kafka主题,从单个消费者那里订阅该主题,并且我拥有该消费者的唯一消费者群体.现在,我尝试仅使用所有分区中的最新消息(在我的情况下,当前时间之前3分钟).我使用了如下的offsetForTimes方法.
I've one kafka topic with 8 partitions, subscribing the topic from single consumer and I've unique consumer group for the consumer. Now I tried to consume only the recent messages (in my case 3 mins before from current time) from all partitions. I used offsetForTimes method like below.
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartions = partitionInfos.stream().......collect(Collectors.toList());
Long value = Instant.now().minus(120,ChronoUnit.SECONDS).toEpochMillis();
Map<TopicPartion,Long> topicPartitionTime = topicPartions.stream().collect(COllectors.toMap(tp -> tp,(value)));
Map<TopicPartition, OffsetAndTimeStamp> offsets = consumer.offsetsForTimes(topicPartitionTime);
现在的问题是 offsetsForTimes仅返回一个或两个分区的偏移位置,其余部分返回null.
now question is offsetsForTimes only returns one or two partitions offset positions and returns null for remaining.
我要消耗所有分区的最新消息,而不是一两个分区.
I want to consume all partitions recent messages not one or two partitions.
我也在下面尝试过
consumer.unsubscribe();
consumer.assign(allPartitions);
Map<TopicPartition, OffsetAndTimeStamp> offsets = consumer.offsetsForTimes(topicPartitionTime);
,但仍然只获得一两个偏移位置.在最坏的情况下,所有partitons的偏移都为零.
but still getting only one or two offset positions.In worst case some times null offsets for all partitons.
如果offsetForTimes仅适用于一个/两个分区,那么如何从单个使用者中轮询所有分区的最近记录?
if offsetForTimes works only with one/two partition, How to poll all partition recent records from single consumer ?
我正在使用Kafka集群.在3-4台计算机上共享8个分区.
EDITED : I'm using Kafka cluster. 8 partitions shared on 3-4 machines.
其他输入:- 我能够在以下情况下重现该问题.
Additional Inputs:- I am able to reproduce the problem with below scenario.
- 创建三个主题A(1个分区),B(10个分区)和C(10个分区)
- KafkaStreams消费来自A的消息并将消息推送到B& C.
- 向A主题发送了大约100条消息. KafkaStreams消费并推送到B& C主题.我可以看到消息分散在B& C中的所有分区上(即10个分区包含大约10条消息).
- 我创建了一个KafkaConsumer,使用了B主题.现在,我用所有分区调用offsetForTimes方法,并且时间戳是当前时间的5分钟.
- 确保的consumer.assignment()返回offsetForTimes之前的所有分区.
- offsetForTimes返回具有偏移位置的单个分区,但是当我调用consumer.poll方法时,它也从其他分区返回消息.
使用apache kafka版本-2.11-2.2.0 Kafka客户jar-2.0.1
using apache kafka version - 2.11-2.2.0 Kafka clients jar - 2.0.1
提前了解帮助.
推荐答案
我无法重现您的病情;我唯一获得偏移量的null
是当该分区没有提交的偏移量时.例如我有10个分区,但只写8个分区:
I can't reproduce your condition; the only time I get null
for the offset is when there is no committed offset for that partition. e.g. I have 10 partitions but only write to 8:
@SpringBootApplication
public class So59200574Application implements ConsumerSeekAware {
public static void main(String[] args) {
SpringApplication.run(So59200574Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so59200574").partitions(10).replicas(1).build();
}
@KafkaListener(id = "so59200574", topics = "so59200574")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ConsumerAwareRebalanceListener rebal() {
return new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
final long tenSecondsAgo = System.currentTimeMillis() - 10_000L;
partitions.forEach(tp -> timestampsToSearch.computeIfAbsent(tp, tp1 -> tenSecondsAgo));
System.out.println(consumer.offsetsForTimes(timestampsToSearch));
}
};
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> IntStream.range(0, 8).forEach(i -> template.send("so59200574", i, null, "foo" + i));
}
}
这篇关于Kafka消费者offsetForTimes方法仅返回几个分区偏移量位置,而不是全部的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!