Kafka 0.9.0.1 Java Consumer 卡在 awaitMetadataUpdate() [英] Kafka 0.9.0.1 Java Consumer stuck in awaitMetadataUpdate()

查看:37
本文介绍了Kafka 0.9.0.1 Java Consumer 卡在 awaitMetadataUpdate()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Java API v0.9.0.1 让一个简单的 Kafka Consumer 工作.我使用的 kafka 服务器是一个 docker 容器,也运行版本 0.9.0.1.以下是消费者代码:

I'm trying to get a simple Kafka Consumer to work using the Java API v0.9.0.1. The kafka server I'm using is a docker container, also running version 0.9.0.1. Below is the consumer code:

public class Consumer {
    public static void main(String[] args) throws IOException {

        KafkaConsumer<String, String> consumer;
        try (InputStream props = Resources.getResource("consumer.props").openStream()) {
            Properties properties = new Properties();
            properties.load(props);
            consumer = new KafkaConsumer<>(properties);
        }

        consumer.subscribe(Arrays.asList("messages"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.println("Message received: " + record.value());
            }
        }catch(WakeupException ex){
            System.out.println("Exception caught " + ex.getMessage());
        }finally{
            consumer.close();
            System.out.println("After closing KafkaConsumer");
        }
    }
}

但是,在启动消费者时,它调用了上面的 poll(100) 方法并且永远不会返回.调试,它看起来像永远在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient 中运行以下方法卡住了:

However, when starting the consumer, it calls the poll(100) method above and never returns. Debugging, it looks like it gets stuck running the following method in org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient forever:

public void awaitMetadataUpdate() {
    int version = this.metadata.requestUpdate();

    do {
        this.poll(9223372036854775807L);
    } while(this.metadata.version() == version);

}

(版本和 this.metadata.version() 似乎总是 == 2).此外,虽然它没有抛出任何错误,但来自我的 java 生产者的消息从未见过进入队列.我已经验证使用命令行 kafka 工具,我可以从队列发送和接收消息.

(both version and this.metadata.version() always seem to be == 2). Additionally, though it throws no errors, messages from my java producer never seen to make it to the queue. I've verified that using the command line kafka tools, I can both send and receive messages from the queue.

有人知道这里发生了什么吗?

Anyone have any clue what's going on here?

推荐答案

如果这有助于其他有类似问题的人,我的解决方案是设置以下环境变量:

In case this helps anyone else with similar issues, the solution for me was to set the following environment variables:

ADVERTISED_HOST=localhost
ADVERTISED_PORT=9092

(当然,此处的值可能会更改以适合您的安装)

(of course, the values here may change to suit your installation)

显然,命令行消费者和生产者脚本可以在不设置这些环境变量的情况下设法找到并与代理正确通信,但 Java API 实现不能.也不会抛出任何错误,只是在第一次轮询尝试更新元数据时无限循环.

Apparently the command line consumer and producer scripts can manage to find and communicate with the broker correctly without these env variables being set, but the Java API implementations cannot. No errors are thrown either, just an infinite loop on the first poll when it tries to update the metadata.

这篇关于Kafka 0.9.0.1 Java Consumer 卡在 awaitMetadataUpdate()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆