Apache Kafka LEADER_NOT_AVAILABLE [英] Apache Kafka LEADER_NOT_AVAILABLE
问题描述
我遇到了我不了解的Apache Kafka问题.我在经纪人中订阅了一个名为"topic-received"的主题.这是代码:
I'm running into an issue with apache Kafka that I don't understand . I subscribe to a topic in my broker called "topic-received" . This is the code :
protected String readResponse(final String idMessage) {
if (props != null) {
kafkaClient = new KafkaConsumer<>(props);
logger.debug("Subscribed to topic-received");
kafkaClient.subscribe(Arrays.asList("topic-received"));
logger.debug("Waiting for reading : topic-received");
ConsumerRecords<String, String> records =
kafkaClient.poll(kafkaConfig.getRead_timeout());
if (records != null) {
for (ConsumerRecord<String, String> record : records) {
logger.debug("Resultado devuelto : "+record.value());
return record.value();
}
}
}
return null;
}
发生这种情况时,我从另一点向主题已接收"发送了一条消息.代码如下:
As this is happening, I send a message to "topic-received" from another point . The code is the following one :
private void sendMessageToKafkaBroker(String idTopic, String value) {
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<String, String>(mapProperties());
ProducerRecord<String, String> producerRecord = new
ProducerRecord<String, String>("topic-received", value);
producer.send(producerRecord);
logger.info("Sended value "+value+" to topic-received");
} catch (ExceptionInInitializerError eix) {
eix.printStackTrace();
} catch (KafkaException ke) {
ke.printStackTrace();
} finally {
if (producer != null) {
producer.close();
}
}
}
我第一次尝试以主题已接收"为主题,我会收到这样的警告
First time I try , with topic "topic-received", I get a warning like this
"WARN 13164 --- [nio-8085-exec-3] org.apache.kafka.clients.NetworkClient :
Error while fetching metadata with correlation id 1 : {topic-
received=LEADER_NOT_AVAILABLE}"
但是,如果我再试一次,对于该主题已收到主题",效果很好,并且未显示警告.无论如何,这对我来说没有用,因为我每次都必须听一个主题并将其发送到一个新主题(由String标识符引用,例如:.. 12Erw45-2345Saf-234DASDFasd)
But if I try again, to this topic "topic-received", works ok, and no warning is presented . Anyway, that's not useful for me, because I have to listen from a topic and send to a topic new each time ( referenced by an String identifier ex: .. 12Erw45-2345Saf-234DASDFasd )
在Google中寻找LEADER_NOT_AVAILABLE时,有些人谈论将以下行添加到server.properties中:
Looking for LEADER_NOT_AVAILABLE in google , some guys talk about adding to server.properties the next lines :
host.name=127.0.0.1
advertised.port=9092
advertised.host.name=127.0.0.1
但是它对我不起作用(不知道为什么).
But it's not working for me ( Don't know why ) .
在所有此过程之前,我都尝试使用以下代码创建主题:
I have tried to create the topic before all this process with the following code:
private void createTopic(String idTopic) {
String zookeeperConnect = "localhost:2181";
ZkClient zkClient = new ZkClient(zookeeperConnect,10000,10000,
ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = new ZkUtils(zkClient, new
ZkConnection(zookeeperConnect),false);
if(!AdminUtils.topicExists(zkUtils,idTopic)) {
AdminUtils.createTopic(zkUtils, idTopic, 2, 1, new Properties(),
null);
logger.debug("Created topic "+idTopic+" by super user");
}
else{
logger.debug("topic "+idTopic+" already exists");
}
}
没有错误,但仍会继续监听直到超时.
No error, but still, it stays listening till the timeout.
我已经审查了代理的属性,以检查是否有任何帮助,但是我还没有发现足够清楚的内容.我用来阅读的道具是:
I have reviewed the properties of the broker to check if there's any help, but I haven't found anything clear enough . The props that I have used for reading are :
props = new Properties();
props.put("bootstrap.servers", kafkaConfig.getBootstrap_servers());
props.put("key.deserializer", kafkaConfig.getKey_deserializer());
props.put("value.deserializer", kafkaConfig.getValue_deserializer());
props.put("key.serializer", kafkaConfig.getKey_serializer());
props.put("value.serializer", kafkaConfig.getValue_serializer());
props.put("group.id",kafkaConfig.getGroupId());
和,用于发送...
Properties props = new Properties();
props.put("bootstrap.servers", kafkaConfig.getHost() + ":" +
kafkaConfig.getPort());
props.put("group.id", kafkaConfig.getGroup_id());
props.put("enable.auto.commit", kafkaConfig.getEnable_auto_commit());
props.put("auto.commit.interval.ms",
kafkaConfig.getAuto_commit_interval_ms());
props.put("session.timeout.ms", kafkaConfig.getSession_timeout_ms());
props.put("key.deserializer", kafkaConfig.getKey_deserializer());
props.put("value.deserializer", kafkaConfig.getValue_deserializer());
props.put("key.serializer", kafkaConfig.getKey_serializer());
props.put("value.serializer", kafkaConfig.getValue_serializer());
有任何线索吗?为什么我必须使用来自代理和主题的消息的唯一方法是在出错后重复请求?
Any clue ? Why , the only way that I have to consume messages from the broker and from the topic, is repeating the request after an error ?
预先感谢
推荐答案
在尝试向不存在的主题生成消息时会发生这种情况
请注意::在某些Kafka安装中,该框架可以在不存在该主题时自动创建该主题,这说明了为什么一开始只能看到一次该问题.
This happens when trying to produce messages to a topic that doesn't exist
PLEASE NOTE: In some Kafka installations, the framework can automatically create the topic when it doesn't exist, that explains why you see the issue only once at the very beginning.
这篇关于Apache Kafka LEADER_NOT_AVAILABLE的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!