在 KafkaJS 中等待领导选举 [英] Waiting for leadership elections in KafkaJS

查看:37
本文介绍了在 KafkaJS 中等待领导选举的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

情况

我正在使用 kafkajs 写入一些动态生成的 kafka 主题.

我发现在注册我的生产者后立即写入这些主题会经常导致错误:这个主题分区没有领导者,因为我们正处于领导者选举中.>

完整的错误是:

{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":[Connection] Response Metadata(key: 3, version: 5)",broker":localhost:9092",clientId":tv-kitchen",error":";这个主题分区没有领导者,因为我们正处于领导者选举中",correlationId":1,大小":146}

代码

这是导致问题的代码:

从'myConfiguredKafkaJs'导入kafkaconst run = async() =>{const 生产者 = kafka.producer()等待 producer.connect()生产者.发送({主题:'myRandomTopicString',消息:[{价值:'yolo',}],})}跑()

问题

两个问题:

  1. 在连接到生产者(或发送)以确保逻辑阻塞直到生产者真正准备好将数据发送到 kafka 主题时,我应该做什么特别的事情吗?
  2. 在向生产者发送数据以确保消息不会被丢弃时,我应该做什么特别的事情吗?

解决方案

解决方案

Kafkajs 通过 admin 客户端 提供了一个 createTopics 方法有一个可选的 waitForLeaders 标志:

admin.createTopics({waitForLeaders:真,话题: [{ 主题:'myRandomTopicString123' },],}

使用它可以解决问题.

从'myConfiguredKafkaJs'导入kafkaconst run = async() =>{const 生产者 = kafka.producer()const admin = kafka.admin()等待 admin.connect()等待 producer.connect()等待 admin.createTopics({waitForLeaders:真,话题: [{ 主题:'myRandomTopicString123' },],})生产者.发送({主题:'myRandomTopicString',消息:[{价值:'yolo',}],})}跑()

不幸的是,如果主题已经存在,这将导致不同的错误,但这是一个单独的问题,我怀疑该错误比破坏性更具信息性.

{"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":";具有此名称的主题已存在"、correlationId":2、size":86}


上述设置确实需要正确配置您的 Kafka 实例.有可能领导选举永远不会解决,在这种情况下,KafkaJS 仍然会抱怨领导选举!

根据我的经验,这是由于 kafka 代理在没有从 Zookeeper 注销的情况下被停止的情况,这意味着 Zookeeper 正在等待不再存在的事物的响应.

The Situation

I am using kafkajs to write to some dynamically generated kafka topics.

I am finding writing to those topics immediately after registering my producer will regularly cause an error: There is no leader for this topic-partition as we are in the middle of a leadership election.

The full error is:

{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}

The Code

Here is the code that is causing the problem:

import kafka from 'myConfiguredKafkaJs'

const run = async () => {
  const producer = kafka.producer()
  await producer.connect()
  producer.send({
    topic: 'myRandomTopicString',
    messages: [{
      value: 'yolo',
    }],
  })
}

run()

The Question

Two questions:

  1. Is there anything special I should be doing when connecting to the producer (or sending) in order to ensure that logic blocks until the producer is truly ready to send data to a kafka topic?
  2. Is there anything special I should be doing when sending data to the producer in order to ensure that messages are not dropped?

解决方案

The Solution

Kafkajs offers a createTopics method through the admin client which has an optional waitForLeaders flag:

admin.createTopics({
  waitForLeaders: true,
  topics: [
    { topic: 'myRandomTopicString123' },
  ],
}

Using this resolves the problem.

import kafka from 'myConfiguredKafkaJs'

const run = async () => {
  const producer = kafka.producer()
  const admin = kafka.admin()
  await admin.connect()
  await producer.connect()
  await admin.createTopics({
    waitForLeaders: true,
    topics: [
      { topic: 'myRandomTopicString123' },
    ],
  })
  producer.send({
    topic: 'myRandomTopicString',
    messages: [{
      value: 'yolo',
    }],
  })
}

run()

Unfortunately this will result in a different error if the topic already existed, but that's a separate question and I suspect that error is more informational than breaking.

{"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":"Topic with this name already exists","correlationId":2,"size":86}


EDIT: the above settings do require that your Kafka instance is properly configured. It is possible to have leadership elections never resolve, in which case KafkaJS will still complain about leadership elections!

In my experience this has been due to situations where a kafka broker was stopped without being de-registered from zookeeper, meaning zookeeper is waiting for a response from something that no longer exists.

这篇关于在 KafkaJS 中等待领导选举的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆