为什么消费者在使用 Java 客户端 API 在 DC/OS 上消费来自 Kafka 的消息时挂起? [英] Why consumer hangs while consuming messages from Kafka on DC/OS using Client API for Java?
问题描述
我在 AWS 上的 DC/OS (Mesos) 集群上安装了 Kafka.启用三个代理并创建一个名为topic1"的主题.
I installed Kafka on DC/OS (Mesos) cluster on AWS. Enabled three brokers and created a topic called "topic1".
dcos kafka topic create topic1 --partitions 3 --replication 3
然后我写了一个 Producer 类来发送消息和一个 Consumer 类来接收它们.
Then I wrote a Producer class to send messages and a Consumer class to receive them.
public class Producer {
public static void sendMessage(String msg) throws InterruptedException, ExecutionException {
Map<String, Object> producerConfig = new HashMap<>();
System.out.println("setting Producerconfig.");
producerConfig.put("bootstrap.servers",
"172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
ByteArraySerializer serializer = new ByteArraySerializer();
System.out.println("Creating KafkaProcuder");
KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer);
for (int i = 0; i < 100; i++) {
String msgstr = msg + i;
byte[] message = msgstr.getBytes();
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message);
System.out.println("Sent:" + msgstr);
kafkaProducer.send(record);
}
kafkaProducer.close();
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
sendMessage("Kafka test message 2/27 3:32");
}
}
public class Consumer {
public static String getMessage() {
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put("bootstrap.servers",
"172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
consumerConfig.put("group.id", "dj-group");
consumerConfig.put("enable.auto.commit", "true");
consumerConfig.put("auto.offset.reset", "earliest");
ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);
kafkaConsumer.subscribe(Arrays.asList("topic1"));
while (true) {
ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
System.out.println(records.count() + " of records received.");
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.println(Arrays.toString(record.value()));
}
}
}
public static void main(String[] args) {
getMessage();
}
}
首先,我在集群上运行 Producer
以将消息发送到 topic1
.但是,当我运行 Consumer
时,它什么也收不到,就挂了.
First I ran Producer
on the cluster to send messages to topic1
. However when I ran Consumer
, it couldn't receive anything, just hang.
Producer
正在工作,因为我能够通过运行 Kafka 安装附带的 shell 脚本来获取所有消息
Producer
is working since I was able to get all the messages by running the shell script that came with Kafka install
./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning
但是为什么我不能用Consumer
接收?这个 post 表明具有旧偏移量的 group.id 可能是一个可能的原因.我只在消费者而不是生产者中创建 group.id.如何配置该组的偏移量?
But why can't I receive with Consumer
? This post suggests group.id with old offset might be a possible cause. I only create group.id in the consumer not the producer. How do I config the offset for this group?
推荐答案
事实证明,kafkaConsumer.subscribe(Arrays.asList("topic1"));
正在导致 poll()
挂起.根据 Kafka 消费者没有收到消息,有两种方法可以连接到一个主题,assign
和 subscribe
.在我用下面的行替换 subscribe
后,它开始工作了.
As it turns out, kafkaConsumer.subscribe(Arrays.asList("topic1"));
is causing poll()
to hang. According to Kafka Consumer does not receive messages
, there are two ways to connect to a topic, assign
and subscribe
. After I replaced subscribe
with the lines below, it started working.
TopicPartition tp = new TopicPartition("topic1", 0);
List<TopicPartition> tps = Arrays.asList(tp);
kafkaConsumer.assign(tps);
然而,输出显示的不是预期的数字数组(生产者发送的字符串).但我想这是一个单独的问题.
However the output shows arrays of numbers which is not expected (Producer sent Strings). But I guess this is a separate issue.
这篇关于为什么消费者在使用 Java 客户端 API 在 DC/OS 上消费来自 Kafka 的消息时挂起?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!