KafkaConsumer 0.10 Java API错误消息:没有当前分区分配 [英] KafkaConsumer 0.10 Java API error message: No current assignment for partition

查看:496
本文介绍了KafkaConsumer 0.10 Java API错误消息:没有当前分区分配的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用KafkaConsumer 0.10 Java api.我想从特定的分区和特定的偏移量中消费.我抬起头,发现有一个搜索方法,但是它抛出异常.任何人都有类似的用例或解决方案吗?

I am using KafkaConsumer 0.10 Java api. I want to consume from a specific partition and specific offset. I looked up and found that there is a seek method but its throwing an exception. Anyone had a similar use case or solution ?

代码:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);

例外

java.lang.IllegalStateException: No current assignment for partition mytopic-1
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
    at xx.xxx.xxx.Test.main(Test.java:182)

推荐答案

在可以seek()之前,首先需要subscribe()进入主题 将主题划分给消费者.还请记住,subscribe()assign()是惰性的-因此,在使用seek()之前,还需要对poll()进行虚拟调用".

Before you can seek() you first need to subscribe() to a topic or assign() partition of a topic to the consumer. Also keep in mind, that subscribe() and assign() are lazy -- thus, you also need to do a "dummy call" to poll() before you can use seek().

注意:从Kafka 2.0开始,新的poll(Duration timeout)是异步的,并且不能保证poll返回时您具有完整的分配.因此,您可能需要先检查分配,然后再使用seek()poll刷新分配. (参见 KIP-266 了解详情)

Note: as of Kafka 2.0, the new poll(Duration timeout) is async and it's not guaranteed that you have a complete assignment when poll returns. Thus, you might need to check your assignment before using seek() and also poll again to refresh the assignment. (Cf. KIP-266 for details)

如果使用subscribe(),则使用组管理:这样,您可以使用相同的group.id启动多个使用者,并且该主题的所有分区将自动在组内的所有使用者上平均分配(每个分区将获得分配给组中的单个消费者).

If you use subscribe(), you use group management: thus, you can start multiple consumers using the same group.id and all partitions of the topic will be assigned evenly over all consumers within the group automatically (each partition will get assigned to a single consumer in the group).

如果要读取特定分区,则需要通过assign()使用手动分配.这样您就可以进行任何所需的分配.

If you want to read specific partitions, you need to use manual assignment via assign(). This allows you to do any assignment you want.

顺便说一句:KafkaConsumer非常长的JavaDoc类,包括示例.值得一读.

Btw: KafkaConsumer has a very long an detailed class JavaDoc including examples. It's worth to read it.

这篇关于KafkaConsumer 0.10 Java API错误消息:没有当前分区分配的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆