PyKafka producer.get_delivery_report在block = false时抛出Queue.empty [英] PyKafka producer.get_delivery_report throwing Queue.empty when block=false

查看:100
本文介绍了PyKafka producer.get_delivery_report在block = false时抛出Queue.empty的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在使用Python进行Kafka集成,而我是来自PHP背景的Kafka和Python新手.

I am currently working on a Kafka integration using Python, and I am new to Kafka and Python coming from a PHP background.

我设法使生产者正常工作,但是由于等待Kafka的确认,它无法足够快地处理每条消息.

I have managed to get the producer working however it is not processing each message fast enough due to waiting for ack from Kafka.

在GitHub页面( https://github.com/Parsely/pykafka )上,以下示例应异步处理消息,但仍允许发送报告:

On the GitHub page (https://github.com/Parsely/pykafka) there is the following example that should process messages asynchronously and still allow for delivery reports:

>>> with topic.get_producer(delivery_reports=True) as producer:
...     count = 0
...     while True:
...         count += 1
...         producer.produce('test msg', partition_key='{}'.format(count))
...         if count % 10**5 == 0:  # adjust this or bring lots of RAM ;)
...             while True:
...                 try:
...                     msg, exc = producer.get_delivery_report(block=False)
...                     if exc is not None:
...                         print 'Failed to deliver msg {}: {}'.format(
...                             msg.partition_key, repr(exc))
...                     else:
...                         print 'Successfully delivered msg {}'.format(
...                         msg.partition_key)
...                 except Queue.Empty:
...                     break

我已经修改了示例,但是从测试中我可以看到成功发送了第一条消息,但是抛出了Queue.empty异常.

I have modified the example, however from testing I can see that the first message is sent successfully, but a Queue.empty exception is thrown.

这是我修改的代码:

from pykafka import KafkaClient
import Queue
import json

client = KafkaClient(hosts='1.1.1.1:9092')
topic = client.topics['test']


sync = False
# sync = True

if sync:
    with topic.get_sync_producer() as producer:
        count = 0
        while True:
            count += 1
            producer.produce('Test message ' + str(count))
            print 'Sent message ' + str(count)
else:
    with topic.get_producer(delivery_reports=True) as producer:
        count = 0
        while True:
            count += 1
            if count >= 100:
                print 'Processed 100 messages'
                break
            producer.produce('Test message ' + str(count))
            while True:
                try:
                    msg, exc = producer.get_delivery_report(block=False)
                    if exc is not None:
                        print 'Failed to deliver msg {}: {}'.format(msg.offset, repr(exc))
                    else:
                        print 'Successfully delivered msg {}'.format(msg.offset)
                except Queue.Empty:
                    print 'Queue.empty'
                    break

输出:

/Users/jim/Projects/kafka_test/env/bin/python /Users/jim/Projects/kafka_test/producer.py
Queue.empty
...
... x100
Processed 100 messages

通过检查我的消费者,我可以看到所有100条消息都已成功发送,但是我无法从生产者那里得知.

From checking my consumer I can see that all 100 messages have been sent successfully, but I am unable to tell this from my producer.

您是否对如何改进此实现有任何建议,更具体地说,如何在保持检查消息成功的能力的同时增加吞吐量?

Do you have any suggestions on how I can improve this implementation, more specifically how I can increase my throughput while keeping the ability to check the message was successful?

推荐答案

我发现了与此相关的GitHub问题:

I found a GitHub issue related to this: https://github.com/Parsely/pykafka/issues/291

我通过将min_queued_messages降低到1来解决此问题.

I fixed this by lowering the min_queued_messages to 1.

with topic.get_sync_producer(min_queued_messages=1) as producer:
        count = 0
        while True:
            count += 1
            time_start = time.time()
            producer.produce('Test message ' + str(count))
            time_end = time.time()

            print 'Sent message %d, %ss duration' % (count, (time_end - time_start))

这篇关于PyKafka producer.get_delivery_report在block = false时抛出Queue.empty的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆