卡夫卡消费者未返回任何事件 [英] Kafka consumer not returning any events
问题描述
下面的Scala kafka使用者未从poll
调用返回任何事件.
The below Scala kafka consumer is not returning any events from the poll
call.
但是,该主题是正确的,我可以看到使用控制台使用者发送到该主题的事件:
However, the topic is correct, and I can see events being sent to the topic using the console consumer:
/opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_topic --from-beginning
当我使用调试器逐步遍历并调用kafkaConsumer.listTopics()
I also see the topic in my Scala code sample below when I step through it with a debugger and invoke kafkaConsumer.listTopics()
此外,这是从单个单元测试中调用的,因此,我仅创建一个具有此特征和使用者的实例(即,另一个使用者实例不能使用消息).我也在使用一个随机的group_id.
Also, this is called from a single unit test, so I'm only creating one instance of this trait and consumer (i.e. another consumer instance can't be consuming the messages). I'm also using a random group_id.
下面的代码/配置有什么问题吗?
Is there anything wrong with the below code/configuration?
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.util.Random
trait KafkaTest {
val kafkaConsumerProperties = new Properties()
kafkaConsumerProperties.put("bootstrap.servers", "kafka:9092")
kafkaConsumerProperties.put("group.id", Random.alphanumeric.take(10).mkString)
kafkaConsumerProperties.put("key.deserializer", classOf[ByteArrayDeserializer])
kafkaConsumerProperties.put("value.deserializer", classOf[StringDeserializer])
val kafkaConsumer = new KafkaConsumer[String, String](kafkaConsumerProperties)
kafkaConsumer.subscribe(java.util.Collections.singletonList("my_topic"))
def checkKafkaHasReceivedEvent(): Assertion = {
val kafkaEvents = kafkaConsumer.poll(2000) // Always returns 0 events?
...
}
}
增加轮询超时也无济于事.
Increasing the poll timeout doesn't help either.
推荐答案
要从头开始读取AUTO_OFFSET_RESET_CONFIG属性必须设置为最早,默认情况下为最新"
To read from beginning AUTO_OFFSET_RESET_CONFIG property has to be set to earliest, by default it "latest"
kafkaConsumerProperties.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.toString().toLowerCase())
这篇关于卡夫卡消费者未返回任何事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!