在 KafkaJS 中等待领导选举 [英] Waiting for leadership elections in KafkaJS
问题描述
情况
我正在使用 kafkajs 写入一些动态生成的 kafka 主题.
我发现在注册我的生产者后立即写入这些主题会经常导致错误:这个主题分区没有领导者,因为我们正处于领导者选举中
.>
完整的错误是:
{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":[Connection] Response Metadata(key: 3, version: 5)",broker":localhost:9092",clientId":tv-kitchen",error":";这个主题分区没有领导者,因为我们正处于领导者选举中",correlationId":1,大小":146}
代码
这是导致问题的代码:
从'myConfiguredKafkaJs'导入kafkaconst run = async() =>{const 生产者 = kafka.producer()等待 producer.connect()生产者.发送({主题:'myRandomTopicString',消息:[{价值:'yolo',}],})}跑()
问题
两个问题:
- 在连接到生产者(或发送)以确保逻辑阻塞直到生产者真正准备好将数据发送到 kafka 主题时,我应该做什么特别的事情吗?
- 在向生产者发送数据以确保消息不会被丢弃时,我应该做什么特别的事情吗?
解决方案
Kafkajs 通过 admin 客户端 提供了一个 createTopics
方法有一个可选的 waitForLeaders
标志:
admin.createTopics({waitForLeaders:真,话题: [{ 主题:'myRandomTopicString123' },],}
使用它可以解决问题.
从'myConfiguredKafkaJs'导入kafkaconst run = async() =>{const 生产者 = kafka.producer()const admin = kafka.admin()等待 admin.connect()等待 producer.connect()等待 admin.createTopics({waitForLeaders:真,话题: [{ 主题:'myRandomTopicString123' },],})生产者.发送({主题:'myRandomTopicString',消息:[{价值:'yolo',}],})}跑()
不幸的是,如果主题已经存在,这将导致不同的错误,但这是一个单独的问题,我怀疑该错误比破坏性更具信息性.
{"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":";具有此名称的主题已存在"、correlationId":2、size":86}
上述设置确实需要正确配置您的 Kafka 实例.有可能领导选举永远不会解决,在这种情况下,KafkaJS 仍然会抱怨领导选举!
根据我的经验,这是由于 kafka 代理在没有从 Zookeeper 注销的情况下被停止的情况,这意味着 Zookeeper 正在等待不再存在的事物的响应.
The Situation
I am using kafkajs to write to some dynamically generated kafka topics.
I am finding writing to those topics immediately after registering my producer will regularly cause an error: There is no leader for this topic-partition as we are in the middle of a leadership election
.
The full error is:
{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}
The Code
Here is the code that is causing the problem:
import kafka from 'myConfiguredKafkaJs'
const run = async () => {
const producer = kafka.producer()
await producer.connect()
producer.send({
topic: 'myRandomTopicString',
messages: [{
value: 'yolo',
}],
})
}
run()
The Question
Two questions:
- Is there anything special I should be doing when connecting to the producer (or sending) in order to ensure that logic blocks until the producer is truly ready to send data to a kafka topic?
- Is there anything special I should be doing when sending data to the producer in order to ensure that messages are not dropped?
The Solution
Kafkajs offers a createTopics
method through the admin client which has an optional waitForLeaders
flag:
admin.createTopics({
waitForLeaders: true,
topics: [
{ topic: 'myRandomTopicString123' },
],
}
Using this resolves the problem.
import kafka from 'myConfiguredKafkaJs'
const run = async () => {
const producer = kafka.producer()
const admin = kafka.admin()
await admin.connect()
await producer.connect()
await admin.createTopics({
waitForLeaders: true,
topics: [
{ topic: 'myRandomTopicString123' },
],
})
producer.send({
topic: 'myRandomTopicString',
messages: [{
value: 'yolo',
}],
})
}
run()
Unfortunately this will result in a different error if the topic already existed, but that's a separate question and I suspect that error is more informational than breaking.
{"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":"Topic with this name already exists","correlationId":2,"size":86}
EDIT: the above settings do require that your Kafka instance is properly configured. It is possible to have leadership elections never resolve, in which case KafkaJS will still complain about leadership elections!
In my experience this has been due to situations where a kafka broker was stopped without being de-registered from zookeeper, meaning zookeeper is waiting for a response from something that no longer exists.
这篇关于在 KafkaJS 中等待领导选举的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!