Kafka Java使用者永远不会收到任何消息 [英] Kafka Java consumer never receives any messages

查看:204
本文介绍了Kafka Java使用者永远不会收到任何消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试设置一个基本的Java使用者来接收来自Kafka主题的消息。我已按照以下示例进行操作 - https://cwiki.apache.org / confluence / display / KAFKA / Consumer + Group + Example - 并且有以下代码:

I'm trying to setup a basic Java consumer to receive messages from a Kafka topic. I've followed the sample at - https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example - and have this code:

package org.example.kafka.client;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaClientMain 
{

    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;  


    public KafkaClientMain(String a_zookeeper, String a_groupId, String a_topic) 
    {
        this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));

        this.topic = a_topic;
    }    


    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "1000");
        props.put("zookeeper.sync.time.ms", "1000");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");

        return new ConsumerConfig(props);
    }    

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
    }    


    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        System.out.println( "streams.size = " + streams.size() );

        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);

        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }    


    public static void main(String[] args) 
    {


        String zooKeeper = "ec2-whatever.compute-1.amazonaws.com:2181";
        String groupId = "group1";
        String topic = "test";

        int threads = 1;

        KafkaClientMain example = new KafkaClientMain(zooKeeper, groupId, topic);

        example.run(threads);

    }

}

package org.example.kafka.client;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerTest implements Runnable 
{

    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) 
    {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() 
    {
        System.out.println( "calling ConsumerTest.run()" );
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();

        while (it.hasNext())
        {    
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
        }


        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

Kafka正在有问题的EC2主机上运行,​​并且我可以使用kafka-console-producer.sh和kafka-console-consumer.sh工具发送和接收有关test主题的消息。端口2181是打开的,可以从消费者运行的机器上获得(9092也是如此,但这似乎没有帮助)。

Kafka is running on the EC2 host in question, and I can send and receive messages on the topic "test" using the kafka-console-producer.sh and kafka-console-consumer.sh tools. Port 2181 is open and available from the machine where the consumer is running (and so is 9092 for good measure, but that didn't seem to help either).

不幸的是,我在运行此消息时从未在消费者中收到任何消息。当消费者正在运行时,既没有关于该主题的现有消息,也没有使用kafka-console-producer.sh发送的新发送的消息。

Unfortunately, I never receive any messages in my consumer when I run this. Neither existing messages on the topic, nor newly sent messages that I send using kafka-console-producer.sh, while the consumer is running.

这是使用Kafka 0.8。 1.1在CentOS 6.4 x64上运行,使用OpenJDK 1.7.0_65。

This is using Kafka 0.8.1.1 running on CentOS 6.4 x64, using OpenJDK 1.7.0_65.

编辑:FWIW,当消费者程序启动时,我看到这个Zookeeper输出:

FWIW, when the consumer program starts, I see this Zookeeper output:

[2014-08-01 15:56:38,045] INFO Accepted socket connection from /98.101.159.194:24218 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-08-01 15:56:38,049] INFO Client attempting to establish new session at /98.101.159.194:24218 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-08-01 15:56:38,053] INFO Established session 0x1478e963fb30008 with negotiated timeout 6000 for client /98.101.159.194:24218 (org.apache.zookeeper.server.NIOServerCnxn)

知道这可能会发生什么吗?非常感谢任何和所有的帮助。

Any idea what might be going on with this? Any and all help is much appreciated.

推荐答案

自己为后代回答这个问题,万一其他人遇到类似的问题。

Answering this myself for posterity, in case anybody else runs across a similar problem.

问题是:Kafka经纪人和Zookeeper在EC2节点上,消费者在我的笔记本电脑上运行。当连接到Zookeeper时,客户端获得了对ip-10-0-x-x.ec2.internal的引用,该引用无法从EC2外部解析(默认情况下)。一旦我在客户端上正确配置了log4j,这就变得清晰了,所以我收到了所有日志消息。

The issue was this: The Kafka broker and Zookeeper were on an EC2 node, and the consumer was on my laptop running locally. When connecting to Zookeeper, the client was getting handed a reference to "ip-10-0-x-x.ec2.internal", which does not resolve (by default) from outside of EC2. This became clear once I properly configured log4j on the client so I was getting all of the log messages.

解决方法是在我的/ etc / hosts中放入一个条目文件,将ec2内部主机名映射到可公共路由的IP地址。

The workaround was to just put an entry in my /etc/hosts file, mapping the ec2 internal hostname to the publicly routable IP address.

这篇关于Kafka Java使用者永远不会收到任何消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆