Kafka Consumer API无法订阅主题 [英] Kafka consumer api failed to subscribe to topic

查看:101
本文介绍了Kafka Consumer API无法订阅主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用简单的Kafka客户端API.据我所知,有两种方式可以使用消费者消息,订阅主题并为消费者分配分区.

I am using simple Kafka client API. As far as I know there are two ways to consumer messages, subscribe to a topic and assign partition to consumer.

但是第一种方法不起作用.使用者poll()将永远挂起.它仅适用于assign.

However the first method does not work. Consumer poll() would hang forever. It only works with assign.

    // common config for consumer
    Map<String, Object> config = new HashMap<>();
    config.put("bootstrap.servers", bootstrap);

    config.put("group.id", KafkaTestConstants.KAFKA_GROUP);
    config.put("enable.auto.commit", "true");
    config.put("auto.offset.reset", "earliest");
    config.put("key.deserializer", StringDeserializer.class.getName());
    config.put("value.deserializer", StringDeserializer.class.getName());
    StringDeserializer deserializer = new StringDeserializer();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, deserializer, deserializer);

    // subscribe does not work, poll() hangs
    consumer.subscribe(Arrays.asList(KafkaTestConstants.KAFKA_TOPIC));

这是通过分配分区而起作用的代码.

Here is the code that works by assigning the partition.

    // assign works
    TopicPartition tp = new TopicPartition(KafkaTestConstants.KAFKA_TOPIC, 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    consumer.assign(tps);

由于我想使用自动提交功能,该功能应该仅根据此

Since I'd like to utilize the auto commit feature which is supposed to only work with consumer group management according to this post. Why does not subscribe() work?

推荐答案

我遇到了同样的问题. 我使用的是kafka_2.12 jar版本,当我将其降级到kafka_2.11时可以使用.

I faced the same issue. I was using the kafka_2.12 jar version, when I downgrade it to kafka_2.11 it worked.

这篇关于Kafka Consumer API无法订阅主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆