如何在程序中停止 Python Kafka Consumer? [英] How to stop Python Kafka Consumer in program?

查看:42
本文介绍了如何在程序中停止 Python Kafka Consumer?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在做 Python Kafka 消费者(尝试在 http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html).当我运行以下代码时,它会一直运行,即使所有消息都被消耗了.我希望消费者在消费完所有消息后停止.怎么做?我也不知道如何使用 stop() 函数(它在基类 kafka.consumer.base.Consumer 中).

I am doing Python Kafka consumer (trying to use kafka.consumer.SimpleConsumer or kafka.consumer.simple.SimpleConsumer in http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html). When I run the following piece of code, it will run all the time, even if all messages consumed. I hope the consumer will stop if it consume all the messages. How to do it? Also I have no idea how to use stop() function (which is in base class kafka.consumer.base.Consumer).

更新

我使用信号处理程序调用consumer.stop().一些错误信息被打印到屏幕上.但是程序仍然卡在 for 循环中.当新消息进来时,消费者消费它们并打印它们.我也试过 client.close().但同样的结果.

I used signal handler to call consumer.stop(). Some error messages were printed out to the screen. But the program still was stuck in the for-loop. When new messages came in, the consumer consumed them and printed them. I also tried client.close(). But the same result.

我需要一些方法来优雅地停止 for 循环.

        client = KafkaClient("localhost:9092")
        consumer = SimpleConsumer(client, "test-group", "test")

        consumer.seek(0, 2)# (0,2) and (0,0)

        for message in consumer:
            print "Offset:", message.offset
            print "Value:", message.message.value

欢迎任何帮助.谢谢.

推荐答案

使用 iter_timeout 参数设置等待时间.如果设置为10,就像下面这段代码一样,如果10秒内没有新消息进来,它就会退出.默认值为None,表示即使没有新消息进来,消费者也会在这里阻塞.

Use the iter_timeout parameter to set the waiting time. If set to 10, just like the following piece of code, it will exit if no new message come in in 10 seconds. The default value is None, which means that the consumer will block here even if no new messages come in.

        self.consumer = SimpleConsumer(self.client, "test-group", "test",
                iter_timeout=10)

更新

以上不是一个好方法.当大量消息进来时,很难设置足够小的 iter_timeout 来保证停止.所以,现在,我正在使用 get_message() 函数,它尝试使用一条消息并停止.没有新消息时不返回任何内容.

The above is not a good method. When lots of messages come in, it is hard to set a small enough iter_timeout to guarantee the stopping. So, now, I am using get_message() function, which try to consume one message and stop. None is returned when no new messages.

这篇关于如何在程序中停止 Python Kafka Consumer?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆