kafka-consumer-api相关内容

Kafka如何从__consumer_offsets主题阅读

我正在尝试找出当前我的高级消费者正在解决的抵消措施.我使用Kafka 0.8.2.1,在Kafka的server.properties中设置了否"offset.storage"-我认为这意味着偏移量存储在Kafka中. (通过检查Zk shell中的以下路径,我还验证了Zookeeper中没有存储偏移量:/consumers/consumer_group_name/offsets/topic_na ..
发布时间:2020-04-25 08:25:29 其他开发

Kafka 0.10.0.0和更高版本的session.timeout.ms和max.poll.interval.ms之间的差异

我不清楚为什么我们同时需要session.timeout.ms和max.poll.interval.ms以及何时使用一个或多个?似乎这两个设置都指示协调器在假定死机之前等待从消费者那里获取心跳的时间上限. 对于基于解决方案 在KIP-62之前,只有session.timeout.ms(即Kafka 0.10.0和更早版本).通过max.poll.interval.ms "rel =" no ..
发布时间:2020-04-25 08:25:25 其他开发

提取具有关联ID 1的元数据时发生警告错误:{MY_TOPIC?= INVALID_TOPIC_EXCEPTION}(org.apache.kafka.clients.NetworkClient)

当我使用kafka 0.9.0.1运行以下命令时,收到此警告[1].您能告诉我我的主题有什么问题吗? (我正在与在ec2中运行的kafka经纪人交谈) #./kafka-console-consumer.sh --new-consumer --bootstrap-server kafka.xx.com:9092 --topic MY_TOPIC? [1] [2016-04-06 ..

卡夫卡模式订阅。新主题没有触发重新平衡

根据 kafka javadocs 如果我: 订阅模式 创建与模式匹配的主题 重新平衡应该发生,这使消费者从该新主题中读取。但这种情况并没有发生。 如果我停止并启动消费者,它确实会接受新主题。所以我知道新主题与模式匹配。在 https:// stackoverflow中可能存在此问题的重复内容。 com / questions / 37120537 / whitelist-fi ..
发布时间:2019-01-02 20:55:31 Java开发

Kafka CommitFailedException消费者异常

在创建多个使用者(使用Kafka 0.9 java API)并且每个线程启动后,我收到以下异常 消费者失败但异常:org.apache.kafka.clients.consumer.CommitFailedException:由于组重新平衡 class com.messagehub.consumer.Consumer正在关闭,因此无法完成提交。 org.apache.kafka.cli ..
发布时间:2019-01-02 15:03:17 Java开发

如何暂停卡夫卡消费者?

我在我的框架中使用Kafka生产者 - 消费者模型。消费者端消耗的记录随后被索引到elasticsearch上。在这里我有一个用例,如果ES关闭,我将不得不暂停kafka消费者直到ES启动,一旦启动,我需要恢复消费者并消耗我上次离开的记录。 我不认为这可以用@KafkaListener实现。有谁能请给我一个解决方案吗?我想我需要为此编写自己的KafkaListenerContainer,但我无法 ..
发布时间:2019-01-02 10:56:24 Java开发

如何获得kafka主题的最新偏移量?

我正在使用Java编写 kafka 使用者。我想保留消息的实时,所以如果有太多消息等待消费,例如1000或更多,我应该放弃未使用的消息并开始使用最新的消息。 对于这个问题,我尝试比较最后提交的偏移量和主题的最新偏移量(只有1个分区),如果这两个偏移量之间的差异大于某个量,我将设置作为下一个偏移量的主题的最新偏移量,以便我可以放弃那些冗余消息。 现在我的问题是如何获得主题的最新偏移量,有 ..
发布时间:2018-12-28 20:41:24 Java开发

Apache Kafka和Avro:org.apache.avro.generic.GenericData $ Record无法强制转换为com.harmeetsingh13.java.Customer

每当我尝试从kafka队列中读取消息时,我都会遇到以下异常: [error](运行) -main-0)java.lang.ClassCastException:org.apache.avro.generic.GenericData $ Record无法强制转换为com.harmeetsingh13.java.Customer java.lang.ClassCastException:o ..

Kafka使用者 - 消费者进程和线程与主题分区的关系是什么

我最近一直与卡夫卡合作,对消费者群体下的消费者有点混淆。混淆的中心是将消费者实现为流程还是线程。对于这个问题,假设我使用的是高级消费者。 让我们考虑一下我已经尝试过的场景。在我的主题中有2个分区(为简单起见,我们假设复制因子只有1)。我创建了一个消费者( ConsumerConnector )进程 consumer1 ,其中组 group1 ,然后创建一个大小为2的主题计数图,然后在该过程下 ..

Kafka如何为每个主题存储偏移量?

在轮询Kafka时,我使用 subscribe()函数订阅了多个主题。现在,我想设置我想从每个主题中读取的偏移量,而不是在每次 seek()和 poll()之后重新订阅来自一个主题。将在每个主题名称上迭代地调用 seek(),在轮询数据之前实现结果? 如何在Kafka中准确存储偏移量? 我每个主题只有一个分区,只有一个消费者可以从所有主题中读取。 解决方案 每个主题的KAFKA商店 ..
发布时间:2018-12-27 18:29:52 Java开发

带解码器问题的Kafka Avro Consumer

当我尝试使用我各自的架构运行 Kafka Consumer with Avro 时,它返回错误“AvroRuntimeException:格式错误的数据。长度为负:-40”。我看到其他人也有类似的问题将字节数组转换为json , Avro写入和读取,以及 Kafka Avro Binary *编码器。我还引用了这个 Consumer Group Example 有帮助,但到目前为止没有帮助这个错误 ..
发布时间:2018-12-05 20:02:18 Java开发