如何在程序中停止Python Kafka Consumer? [英] How to stop Python Kafka Consumer in program?

查看:1290
本文介绍了如何在程序中停止Python Kafka Consumer?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在做Python Kafka使用者(尝试在

I am doing Python Kafka consumer (trying to use kafka.consumer.SimpleConsumer or kafka.consumer.simple.SimpleConsumer in http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html). When I run the following piece of code, it will run all the time, even if all messages consumed. I hope the consumer will stop if it consume all the messages. How to do it? Also I have no idea how to use stop() function (which is in base class kafka.consumer.base.Consumer).

更新

我使用信号处理程序来调用consumer.stop().一些错误消息被打印到屏幕上.但是程序仍然停留在for循环中.当收到新消息时,消费者将它们消耗掉并打印出来.我也尝试过client.close().但是结果还是一样.

I used signal handler to call consumer.stop(). Some error messages were printed out to the screen. But the program still was stuck in the for-loop. When new messages came in, the consumer consumed them and printed them. I also tried client.close(). But the same result.

我需要一些方法来优雅地停止for循环.

        client = KafkaClient("localhost:9092")
        consumer = SimpleConsumer(client, "test-group", "test")

        consumer.seek(0, 2)# (0,2) and (0,0)

        for message in consumer:
            print "Offset:", message.offset
            print "Value:", message.message.value

欢迎任何帮助.谢谢.

推荐答案

使用iter_timeout参数设置等待时间.如果将其设置为10,则与下面的代码段一样,如果在10秒内没有新消息出现,它将退出.默认值为无",这意味着即使没有新消息进入,使用者也将在此处阻止.

Use the iter_timeout parameter to set the waiting time. If set to 10, just like the following piece of code, it will exit if no new message come in in 10 seconds. The default value is None, which means that the consumer will block here even if no new messages come in.

        self.consumer = SimpleConsumer(self.client, "test-group", "test",
                iter_timeout=10)

更新

以上不是一个好的方法.当有大量消息进入时,很难设置足够小的iter_timeout来保证停止.因此,现在,我正在使用get_message()函数,该函数尝试消耗一条消息并停止.没有新消息时不返回任何内容.

The above is not a good method. When lots of messages come in, it is hard to set a small enough iter_timeout to guarantee the stopping. So, now, I am using get_message() function, which try to consume one message and stop. None is returned when no new messages.

这篇关于如何在程序中停止Python Kafka Consumer?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆