如何以编程方式检查 Kafka Broker 是否已启动并在 Python 中运行 [英] How to programmatically check if Kafka Broker is up and running in Python

查看:46
本文介绍了如何以编程方式检查 Kafka Broker 是否已启动并在 Python 中运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用来自 Kafka 主题的消息.我在 confluent_kafka 消费者周围使用了一个包装器.在开始消费消息之前,我需要检查是否建立了连接.

I'm trying to consume messages from a Kafka topic. I'm using a wrapper around confluent_kafka consumer. I need to check if connection is established before I start consuming messages.

我读到消费者很懒惰,所以我需要执行一些操作才能建立连接.但是我想在不执行consumepoll 操作的情况下检查连接建立.

I read that the consumer is lazy, so I need to perform some action for the connection to get established. But I want to check the connection establishment without doing a consume or poll operation.

此外,我尝试给出一些错误的配置,以查看民意调查的响应.我得到的回应是:

Also, I tried giving some bad configurations to see what the response on a poll would be. The response I got was:

b'Broker: No more messages'

那么,我该如何判断是连接参数错误、连接中断还是主题中实际上没有消息?

So, how do I decide if the connection parameters are faulty, the connection is broken, or there actually are no messages in the topic?

推荐答案

恐怕没有直接的方法来测试 Kafka Brokers 是否已启动并运行.另请注意,如果您的消费者已经消费了这些消息,这并不意味着这是一种不良行为,而且显然这并不表示 Kafka 代理已关闭.

I am afraid there is no direct approach for testing whether Kafka Brokers are up and running. Also note that if your consumer has already consumed the messages it doesn't mean that this is a bad behaviour and obviously it does not indicate that the Kafka broker is down.

一种可能的解决方法是执行某种快速操作并查看代理是否响应.一个例子是列出主题:

A possible workaround would be to perform some sort of quick operation and see if the broker responds. An example would be listing the topics:

使用confluent-kafka-pythonAdminClient

# Example using confuent_kafka
from confluent_kafka.admin import AdminClient

kafka_broker = {'bootstrap.servers': 'localhost:9092'}
admin_client = AdminClient(kafka_broker)
topics = admin_client.list_topics().topics

if not topics: 
    raise RuntimeError()

使用 kafka-pythonKafkaConsumer

# example using kafka-python
import kafka


consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['localhost:9092'])
topics = consumer.topics()

if not topics: 
    raise RuntimeError()

这篇关于如何以编程方式检查 Kafka Broker 是否已启动并在 Python 中运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆