Kafka Java 消费者永远不会收到任何消息 [英] Kafka Java consumer never receives any messages

查看:23
本文介绍了Kafka Java 消费者永远不会收到任何消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试设置一个基本的 Java 使用者来接收来自 Kafka 主题的消息.我已经按照示例 - https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example - 并使用以下代码:

I'm trying to setup a basic Java consumer to receive messages from a Kafka topic. I've followed the sample at - https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example - and have this code:

package org.example.kafka.client;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaClientMain 
{

    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;  


    public KafkaClientMain(String a_zookeeper, String a_groupId, String a_topic) 
    {
        this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));

        this.topic = a_topic;
    }    


    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "1000");
        props.put("zookeeper.sync.time.ms", "1000");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");

        return new ConsumerConfig(props);
    }    

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
    }    


    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        System.out.println( "streams.size = " + streams.size() );

        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);

        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }    


    public static void main(String[] args) 
    {


        String zooKeeper = "ec2-whatever.compute-1.amazonaws.com:2181";
        String groupId = "group1";
        String topic = "test";

        int threads = 1;

        KafkaClientMain example = new KafkaClientMain(zooKeeper, groupId, topic);

        example.run(threads);

    }

}

package org.example.kafka.client;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerTest implements Runnable 
{

    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) 
    {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() 
    {
        System.out.println( "calling ConsumerTest.run()" );
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();

        while (it.hasNext())
        {    
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
        }


        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

Kafka 正在相关 EC2 主机上运行,​​我可以使用 kafka-console-producer.sh 和 kafka-console-consumer.sh 工具发送和接收有关test"主题的消息.端口 2181 是开放的,并且可以从正在运行消费者的机器上使用(9092 也是一个很好的衡量标准,但这似乎也无济于事).

Kafka is running on the EC2 host in question, and I can send and receive messages on the topic "test" using the kafka-console-producer.sh and kafka-console-consumer.sh tools. Port 2181 is open and available from the machine where the consumer is running (and so is 9092 for good measure, but that didn't seem to help either).

不幸的是,当我运行它时,我从未在我的消费者中收到任何消息.在消费者运行时,既没有关于该主题的现有消息,也没有我使用 kafka-console-producer.sh 发送的新发送的消息.

Unfortunately, I never receive any messages in my consumer when I run this. Neither existing messages on the topic, nor newly sent messages that I send using kafka-console-producer.sh, while the consumer is running.

这是使用在 CentOS 6.4 x64 上运行的 Kafka 0.8.1.1,使用 OpenJDK 1.7.0_65.

This is using Kafka 0.8.1.1 running on CentOS 6.4 x64, using OpenJDK 1.7.0_65.

FWIW,当消费者程序启动时,我看到了这个 Zookeeper 输出:

FWIW, when the consumer program starts, I see this Zookeeper output:

[2014-08-01 15:56:38,045] INFO Accepted socket connection from /98.101.159.194:24218 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-08-01 15:56:38,049] INFO Client attempting to establish new session at /98.101.159.194:24218 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-08-01 15:56:38,053] INFO Established session 0x1478e963fb30008 with negotiated timeout 6000 for client /98.101.159.194:24218 (org.apache.zookeeper.server.NIOServerCnxn)

知道这会发生什么吗?非常感谢任何和所有帮助.

Any idea what might be going on with this? Any and all help is much appreciated.

推荐答案

自己回答这个问题给后代,以防其他人遇到类似的问题.

Answering this myself for posterity, in case anybody else runs across a similar problem.

问题是这样的:Kafka 代理和 Zookeeper 在一个 EC2 节点上,而消费者在我本地运行的笔记本电脑上.当连接到 Zookeeper 时,客户端收到了对ip-10-0-x-x.ec2.internal"的引用,它不会从 EC2 外部解析(默认情况下).一旦我在客户端上正确配置了 log4j,这一点就变得清晰起来,因此我获得了所有日志消息.

The issue was this: The Kafka broker and Zookeeper were on an EC2 node, and the consumer was on my laptop running locally. When connecting to Zookeeper, the client was getting handed a reference to "ip-10-0-x-x.ec2.internal", which does not resolve (by default) from outside of EC2. This became clear once I properly configured log4j on the client so I was getting all of the log messages.

解决方法是在我的/etc/hosts 文件中添加一个条目,将 ec2 内部主机名映射到可公开路由的 IP 地址.

The workaround was to just put an entry in my /etc/hosts file, mapping the ec2 internal hostname to the publicly routable IP address.

这篇关于Kafka Java 消费者永远不会收到任何消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆