有时,新的消费群体无效 [英] Sometimes a new consumer group does not work

查看:305
本文介绍了有时,新的消费群体无效的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我曾经在生产环境中看到过这种情况(我不记得我们是如何解决的),现在我可以在集成测试中重复进行,集成测试总是从全新的Kafka安装开始。步骤如下:

I've seen this in production once (I don't remember how we solved it) and now I can repeat it in the integration tests, which always start with a brand new Kafka installation. Here's how it goes:

步骤1:尚不存在的组的用户订阅了尚不存在的主题并开始轮询。

Step 1: A consumer of a group that doesn't exist yet subscribes to a topic that does not exist yet and starts polling.

self.kafka_consumer = confluent_kafka.Consumer({
    'group.id': 'mygroup',
    'bootstrap.servers': 'kafka:9092',
    'enable.auto.commit': False,
    'auto.offset.reset': 'earliest',
})
self.kafka_consumer.subscribe('mytopic')

第2步:生产者向该主题写消息。

Step 2: A producer writes a message to the topic.

结果:


  • 大约可以正常工作的一半;

  • 另一半时间,消费者似乎被卡住了。我尝试了长达10分钟的等待时间,看它是否会被卡住,但是不会。

  • 即使两个步骤都相反,即消费者尝试订阅已经存在的内容主题已经有消息,行为是相同的(但是组始终是新的)。

更多详细信息

消费者正在以2秒的超时进行轮询,如果没有结果,它将循环。

The consumer is polling with a timeout of 2 seconds, and if there's no result it loops over.

虽然该主题不存在,但 poll()返回 None 。主题存在后, poll()返回 msg ,其 error()。code( ) _PARTITION_EOF

While the topic doesn't exist, poll() returns None. After the topic exists, poll() returns an msg whose error().code() is _PARTITION_EOF.

当消费者似乎被卡住时,我问kafka怎么回事在 mygroup 上,它告诉我的是:

While the consumer seems stuck, I ask kafka what's going on with mygroup, and here's what it tells me:

root@e7b124b4039c:/# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
Note: This will not show information about old Zookeeper-based consumers.


TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
root@e7b124b4039c:/#

我试图通过读取另一个不存在的主题,例如 mygroup 来使其不被卡住: p>

I try to make it unstuck by trying to read another nonexistent topic as mygroup:

root@e7b124b4039c:/# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group mygroup --topic nonexistent --from-beginning
[2018-03-15 16:36:59,369] WARN [Consumer clientId=consumer-1, groupId=pixelprocessor] Error while fetching metadata with correlation id 2 : {nonexistent=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
^CProcessed a total of 0 messages
root@e7b124b4039c:/#

完成之后,这就是卡夫卡对 mygroup 的评价:

After I do that, here's what Kafka has to say about mygroup:

root@e7b124b4039c:/# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
Note: This will not show information about old Zookeeper-based consumers.


TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
mytopic                        0          -               1               -          rdkafka-a172d013-08e6-4ee2-92f3-fdb07d163d57      /172.20.0.6                    rdkafka
(another topic)                0          -               0               -          rdkafka-a172d013-08e6-4ee2-92f3-fdb07d163d57      /172.20.0.6                    rdkafka
(a third topic)                0          -               0               -          rdkafka-a172d013-08e6-4ee2-92f3-fdb07d163d57      /172.20.0.6                    rdkafka
nonexistent                    0          0               0               0          -                                                 -                              -

这是Kafka 1.0.1,librdkafka 0.11.3,confluent_kafka 0.11.0,在Ubuntu 16.04泊坞窗(带有操作系统的打包的zookeeper 3.4.8)上运行,它们在Linux 4.9.0-6-amd64的Debian扩展版(9.4)上运行。

This is Kafka 1.0.1, librdkafka 0.11.3, confluent_kafka 0.11.0, on Ubuntu 16.04 dockers (with the OS's packaged zookeeper 3.4.8) which are running on a Debian stretch (9.4) with Linux 4.9.0-6-amd64.

推荐答案

问题似乎出在 Consumer()参数中。这不能正常工作:

The problem seems to have been in the Consumer() arguments. This doesn't work properly:

self.kafka_consumer = confluent_kafka.Consumer({
    'group.id': 'mygroup',
    'bootstrap.servers': 'kafka:9092',
    'auto.offset.reset': 'earliest',
})

但这确实是:

self.kafka_consumer = confluent_kafka.Consumer({
    'group.id': 'mygroup',
    'bootstrap.servers': 'kafka:9092',
    'default.topic.config': {
        'auto.offset.reset': 'earliest',
    },
})

这篇关于有时,新的消费群体无效的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆