为什么使用Java Client API在DC/OS上使用来自Kafka的消息时,用户挂起? [英] Why consumer hangs while consuming messages from Kafka on DC/OS using Client API for Java?

查看:66
本文介绍了为什么使用Java Client API在DC/OS上使用来自Kafka的消息时,用户挂起?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在AWS的DC/OS(Mesos)集群上安装了Kafka.启用了三个代理,并创建了一个名为"topic1"的主题.

I installed Kafka on DC/OS (Mesos) cluster on AWS. Enabled three brokers and created a topic called "topic1".

dcos kafka topic create topic1 --partitions 3 --replication 3

然后,我编写了一个Producer类来发送消息,并编写了一个Consumer类来接收消息.

Then I wrote a Producer class to send messages and a Consumer class to receive them.

public class Producer {
    public static void sendMessage(String msg) throws InterruptedException, ExecutionException {
        Map<String, Object> producerConfig = new HashMap<>();
        System.out.println("setting Producerconfig.");
        producerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");

        ByteArraySerializer serializer = new ByteArraySerializer();
        System.out.println("Creating KafkaProcuder");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer);
        for (int i = 0; i < 100; i++) {
            String msgstr = msg + i;
            byte[] message = msgstr.getBytes();
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message);
            System.out.println("Sent:" + msgstr);
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        sendMessage("Kafka test message 2/27 3:32");
    }

}

public class Consumer {
    public static String getMessage() {
        Map<String, Object> consumerConfig = new HashMap<>();
        consumerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
        consumerConfig.put("group.id", "dj-group");
        consumerConfig.put("enable.auto.commit", "true");
        consumerConfig.put("auto.offset.reset", "earliest");
        ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

        kafkaConsumer.subscribe(Arrays.asList("topic1"));
        while (true) {
            ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
            System.out.println(records.count() + " of records received.");
            for (ConsumerRecord<byte[], byte[]> record : records) {
                System.out.println(Arrays.toString(record.value()));
            }
        }
    }

    public static void main(String[] args) {
        getMessage();
    }
}

首先,我在集群上运行了 Producer ,以将消息发送到 topic1 .但是,当我运行 Consumer 时,它什么也没收到,只是挂起.

First I ran Producer on the cluster to send messages to topic1. However when I ran Consumer, it couldn't receive anything, just hang.

Producer 之所以有效,是因为我能够通过运行Kafka install随附的shell脚本来获取所有消息

Producer is working since I was able to get all the messages by running the shell script that came with Kafka install

./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning

但是为什么我不能收到 Consumer ?这篇帖子建议使用具有旧偏移量的group.id可能是原因.我仅在消费者而不是生产者中创建group.id.如何配置该组的偏移量?

But why can't I receive with Consumer? This post suggests group.id with old offset might be a possible cause. I only create group.id in the consumer not the producer. How do I config the offset for this group?

推荐答案

事实证明, kafkaConsumer.subscribe(Arrays.asList("topic1")); 导致 poll()挂起.根据 Kafka消费者未收到消息,有两种方法可以连接到主题,即 assign subscribe .在用下面的行替换 subscribe 后,它开始工作.

As it turns out, kafkaConsumer.subscribe(Arrays.asList("topic1")); is causing poll() to hang. According to Kafka Consumer does not receive messages , there are two ways to connect to a topic, assign and subscribe. After I replaced subscribe with the lines below, it started working.

    TopicPartition tp = new TopicPartition("topic1", 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);

但是,输出显示的是非预期的数字数组(生产者发送了字符串).但是我想这是一个单独的问题.

However the output shows arrays of numbers which is not expected (Producer sent Strings). But I guess this is a separate issue.

这篇关于为什么使用Java Client API在DC/OS上使用来自Kafka的消息时,用户挂起?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆