Kafka 0.9.0.1 Java消费者陷入awaitMetadataUpdate() [英] Kafka 0.9.0.1 Java Consumer stuck in awaitMetadataUpdate()

查看:1143
本文介绍了Kafka 0.9.0.1 Java消费者陷入awaitMetadataUpdate()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试让一个简单的Kafka Consumer使用Java API v0.9.0.1。我正在使用的kafka服务器是一个docker容器,也运行版本0.9.0.1。下面是使用者代码:

I'm trying to get a simple Kafka Consumer to work using the Java API v0.9.0.1. The kafka server I'm using is a docker container, also running version 0.9.0.1. Below is the consumer code:

public class Consumer {
    public static void main(String[] args) throws IOException {

        KafkaConsumer<String, String> consumer;
        try (InputStream props = Resources.getResource("consumer.props").openStream()) {
            Properties properties = new Properties();
            properties.load(props);
            consumer = new KafkaConsumer<>(properties);
        }

        consumer.subscribe(Arrays.asList("messages"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.println("Message received: " + record.value());
            }
        }catch(WakeupException ex){
            System.out.println("Exception caught " + ex.getMessage());
        }finally{
            consumer.close();
            System.out.println("After closing KafkaConsumer");
        }
    }
}

然而,启动消费者时,它调用上面的poll(100)方法,永远不会返回。调试,它看起来好像在org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient中永远运行以下方法:

However, when starting the consumer, it calls the poll(100) method above and never returns. Debugging, it looks like it gets stuck running the following method in org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient forever:

public void awaitMetadataUpdate() {
    int version = this.metadata.requestUpdate();

    do {
        this.poll(9223372036854775807L);
    } while(this.metadata.version() == version);

}

(版本和this.metadata.version()始终好像是== 2)。此外,虽然它没有抛出任何错误,但是我的java生产者的消息从未见过它进入队列。我已经验证了使用命令行kafka工具,我可以发送和接收来自队列的消息。

(both version and this.metadata.version() always seem to be == 2). Additionally, though it throws no errors, messages from my java producer never seen to make it to the queue. I've verified that using the command line kafka tools, I can both send and receive messages from the queue.

任何人都知道这里发生了什么?

Anyone have any clue what's going on here?

推荐答案

In这有助于其他有类似问题的人,我的解决方案是设置以下环境变量:

In case this helps anyone else with similar issues, the solution for me was to set the following environment variables:

ADVERTISED_HOST=localhost
ADVERTISED_PORT=9092

(当然,此处的值可能会更改以适合您的安装)

(of course, the values here may change to suit your installation)

显然,命令行使用者和生产者脚本可以设置正确地查找代理并与代理通信,而不设置这些env变量,但Java API实现不能。也不会抛出任何错误,只是在尝试更新元数据时第一次轮询时出现无限循环。

Apparently the command line consumer and producer scripts can manage to find and communicate with the broker correctly without these env variables being set, but the Java API implementations cannot. No errors are thrown either, just an infinite loop on the first poll when it tries to update the metadata.

这篇关于Kafka 0.9.0.1 Java消费者陷入awaitMetadataUpdate()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆