消费者如何知道它不再列在 Kafka 集群中? [英] How does a consumer know it is no longer listed in the Kafka cluster?

查看:39
本文介绍了消费者如何知道它不再列在 Kafka 集群中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个问题,即当 Kafka 代理必须离线时,没有任何消费者服务对此有任何了解并继续运行.

我们尝试在新的 Kafka 实例中列出消费者,但没有看到那里列出现有消费者.列出的所有消费者都是新创建的消费者.

每次遇到此问题时,我们都必须手动终止所有现有的消费者服务,这很不方便.

问题 - 消费者如何知道它不再列在 Kafka 集群中,因此它应该自行终止?

附言我们使用 Spring Kafka.

解决方案

1 -- 检查集群和副本状态?

检查Kafka集群所有broker状态

$ zookeeper-shell.sh localhost:9001 ls/brokers/ids

检查Kafka集群特定代理状态

$ zookeeper-shell.sh localhost:9001 get/brokers/ids/

特定于replica_unavailability检查

$ kafka-check --cluster-type=sample_type replica_unavailability

第一次经纪人检查

$ kafka-check --cluster-type=sample_type --broker-id 3 replica_unavailability --first-broker-only

任何分区副本不可用

$ kafka-check --cluster-type=sample_type replica_unavailability

检查离线分区

$ kafka-check --cluster-type=sample_type 离线


2 -- 发送/自动关闭的代码示例

<块引用>

2 custom 选项来使用 kill-message 处理 shutdown,在关闭之前通过发送 kill-message 优雅地做到这一点经纪人或主题.

选项 1:考虑带内消息/信号 - 即发送与消费者正在收听的主题/代理相关的kill"消息 因为它遵循主题分区上的偏移顺序

选项 2:让消费者收听 2 个主题,例如topic"和topic_kill"

上述 2 个选项之间的区别在于,第一个版本是按照发送的顺序出现的,考虑到 可能 阻塞消息可能正在等待,这取决于 您的em> 实现,在kill message"之前使用.

虽然,第二个版本允许kill-signal独立到达而不会被阻止带外,这是一个更好的&可重用 架构模式,明确区分数据主题和信号.

<块引用>

代码示例 a) 生产者发送终止消息 & b) 消费者接收并处理关闭

//Producer -- 根据需要进行修改和适配导入json从 kafka 导入 KafkaProducer生产者 = KafkaProducer(bootstrap_servers=['0.0.0.0:<我的端口号>'],key_serializer=lambda m: m.encode('utf8'),value_serializer=lambda m: json.dumps(m).encode('utf8'))def send_kill(主题:str,分区:[int]):对于分区中的 p:producer.send(topic, key='kill', partition=p)生产者.flush()


//消费者接受终止消息——请根据需要进行修改和调整导入json从 kafka 导入 KafkaConsumer从 kafka.structs 导入 OffsetAndMetadata, TopicPartition消费者 = KafkaConsumer(bootstrap_servers=['0.0.0.0:<我的端口号>'],key_deserializer=lambda m: m.decode('utf8'),value_deserializer=lambda m: json.loads(m.decode('utf8')),auto_offset_reset=最早",group_id='1')消费者.订阅(['主题'])对于消费者中的味精:tp = TopicPartition(msg.topic, msg.partition)偏移量 = {tp: OffsetAndMetadata(msg.offset, None)}如果 msg.key == 杀死":消费者提交(偏移=偏移)消费者.取消订阅()退出(0)# 做你的工作...消费者提交(偏移=偏移)

We have this issue that when Kafka brokers must be taken offline, no consumer service has any idea about that and keeps running.

We tried listing consumers in the new Kafka instance, and saw no existing consumer listed there. All consumers listed are those newly created.

We had to manually terminate all existing consumer services which is not convenient every time we hit this issue.

Question - How does a consumer know it is no longer listed in the Kafka cluster so it should terminate itself?

P.S. We use Spring Kafka.

解决方案

1 -- To Check Clusters & Replica status ?

Check Kafka cluster all broker status

$ zookeeper-shell.sh localhost:9001 ls /brokers/ids

Check Kafka cluster Specific broker status

$ zookeeper-shell.sh localhost:9001 get /brokers/ids/<id> 

specific to replica_unavailability check

$ kafka-check --cluster-type=sample_type replica_unavailability

For first broker check

$ kafka-check --cluster-type=sample_type --broker-id 3 replica_unavailability --first-broker-only

Any partitions replicas not available

$ kafka-check --cluster-type=sample_type replica_unavailability

Checking offline partitions

$ kafka-check --cluster-type=sample_type offline


2 -- Code sample to send/auto-shutdown

2 custom options to do handle the shutdown using a kill-message, do it gracefully by sending a kill-message before taking down brokers or topics.

Option 1: Consider an in-band message/signal - i.e. send a "kill" message pertaining to topics/brokers consumer is listening to as it follows the offset order on the topic-partition

Option 2: make the consumer listen to 2 topics for e.g. "topic" and "topic_kill"

The difference between the 2 options above, is that the first version is comes in the the order it was sent, consider that there maybe blocking messages maybe waiting, depending on your implementation, to be consumed before that "kill message".

While, the second version allows kill-signal to arrive independently without being blocked out of band, this is a nicer & reusable architectural pattern, with a clear separation between data topic and signaling.

Code Sample a) producer sending the kill-message & b) consumer to recieve and handle the shutdown

// Producer -- modify and adapt as needed
import json

from kafka import KafkaProducer


producer = KafkaProducer(bootstrap_servers=['0.0.0.0:<my port number>'],
                         key_serializer=lambda m: m.encode('utf8'),
                         value_serializer=lambda m: json.dumps(m).encode('utf8'))


def send_kill(topic: str, partitions: [int]):
    
    for p in partitions:
        producer.send(topic, key='kill', partition=p)
    producer.flush()


// Consumer to accept a kill-message -- please modify and adapt as needed
import json

from kafka import KafkaConsumer
from kafka.structs import OffsetAndMetadata, TopicPartition


consumer = KafkaConsumer(bootstrap_servers=['0.0.0.0:<my port number>'],
                         key_deserializer=lambda m: m.decode('utf8'),
                         value_deserializer=lambda m: json.loads(m.decode('utf8')),
                         auto_offset_reset="earliest",
                         group_id='1')

consumer.subscribe(['topic'])

for msg in consumer:
    tp = TopicPartition(msg.topic, msg.partition)
    offsets = {tp: OffsetAndMetadata(msg.offset, None)}
    
    if msg.key == "kill":
        consumer.commit(offsets=offsets)
        consumer.unsuscribe()
        exit(0)
  
    # do your work...
    
    consumer.commit(offsets=offsets)

这篇关于消费者如何知道它不再列在 Kafka 集群中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆