等待KafkaJS的领导人选举 [英] Waiting for leadership elections in KafkaJS

查看:73
本文介绍了等待KafkaJS的领导人选举的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 kafkajs 来写一些动态生成的kafka主题.

I am using kafkajs to write to some dynamically generated kafka topics.

注册生产者后,我发现立即写这些主题经常会导致错误:由于我们正处于领导层选举之中,因此没有主题分区的领导者.

I am finding writing to those topics immediately after registering my producer will regularly cause an error: There is no leader for this topic-partition as we are in the middle of a leadership election.

完整错误是:

{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}

代码

这是引起问题的代码:

The Code

Here is the code that is causing the problem:

import kafka from 'myConfiguredKafkaJs'

const run = async () => {
  const producer = kafka.producer()
  await producer.connect()
  producer.send({
    topic: 'myRandomTopicString',
    messages: [{
      value: 'yolo',
    }],
  })
}

run()

问题

两个问题:

  1. 在连接到生产者(或发送)时,为了确保逻辑阻塞直到生产者真正准备好将数据发送到kafka主题之前,我应该做些什么特别的事情吗?
  2. 向生产者发送数据时,我有什么特别的事情要做,以确保消息不会被丢弃?

推荐答案

解决方案

Kafkajs通过管理客户端提供了一种 createTopics 方法.有一个可选的 waitForLeaders 标志:

The Solution

Kafkajs offers a createTopics method through the admin client which has an optional waitForLeaders flag:

admin.createTopics({
  waitForLeaders: true,
  topics: [
    { topic: 'myRandomTopicString123' },
  ],
}

使用此方法可以解决问题.

Using this resolves the problem.

import kafka from 'myConfiguredKafkaJs'

const run = async () => {
  const producer = kafka.producer()
  const admin = kafka.admin()
  await admin.connect()
  await producer.connect()
  await admin.createTopics({
    waitForLeaders: true,
    topics: [
      { topic: 'myRandomTopicString123' },
    ],
  })
  producer.send({
    topic: 'myRandomTopicString',
    messages: [{
      value: 'yolo',
    }],
  })
}

run()

不幸的是,如果该主题已经存在,这将导致不同的错误,但这是一个单独的问题,我怀疑该错误比中断更有意义.

Unfortunately this will result in a different error if the topic already existed, but that's a separate question and I suspect that error is more informational than breaking.

{"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":"Topic with this name already exists","correlationId":2,"size":86}

这篇关于等待KafkaJS的领导人选举的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆