Kafka - 如何捕获kafka客户端后台线程生成的消息 [英] Kafka - How to capture messages generated by kafka client background thread

查看:83
本文介绍了Kafka - 如何捕获kafka客户端后台线程生成的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用以下配置模拟消费者关闭/会话超时.我们如何捕获客户端记录到控制台的消息 - SESSSTMOUT|rdkafka#consumer-1|[第三:主要]

Using the below configuration to simulate consumer going down /session timeout. How can we capture messages logged to the console by client - SESSTMOUT|rdkafka#consumer-1| [thrd:main]

consumed message None: msg1: 0: first_topic: 0: None
consumed message None: msg2: 1: first_topic: 0: None
no message received by consumer
no message received by consumer
%4|1603348021.170|SESSTMOUT|rdkafka#consumer-1| [thrd:main]: Consumer group session timed out (in join-state started) after 10005 ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group
no message received by consumer
no message received by consumer
no message received by consumer
%4|1603276138.721|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (30000ms) exceeded by 7ms (adjust max.poll.interval.ms for long-running message processing): leaving group
error from consumer KafkaError{code=_MAX_POLL_EXCEEDED,val=-147,str="Application maximum poll interval (30000ms) exceeded by 7ms"}

from confluent_kafka import Consumer
def consume():
    c = Consumer({"bootstrap.servers": "localhost:9092", 
                  "group.id": "group1",
                  "enable.auto.commit": False,
                  "auto.offset.reset": "earliest",
                  "max.poll.interval.ms": 30000,
                  "session.timeout.ms": 10000,
                  "heartbeat.interval.ms": 15000
                  })
    c.subscribe(["first_topic"])
    while True:
        message = c.poll(1.0)
        if message is None:
            print("no message received by consumer")
        elif message.error() is not None:
            print(f"error from consumer {message.error()}")
        else:
            print(f"consumed message {message.key()}: {message.value().decode('utf-8')}: {message.offset()}: {message.topic()}: {message.partition()}: {message.headers()}")
        time.sleep(10)

推荐答案

heartbeat.interval.ms 必须低于 session.timeout.ms

session.timeout.ms * 1/3 <= heartbeat.interval.ms

session.timeout.ms * 1/3 <= heartbeat.interval.ms < session.timeout.ms

这篇关于Kafka - 如何捕获kafka客户端后台线程生成的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆