Python Producer 可以通过 shell 发送,但不能通过 .py [英] Python Producer can send via shell, but not .py

查看:26
本文介绍了Python Producer 可以通过 shell 发送,但不能通过 .py的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个正在运行并经过测试的 Kafka 集群,我正在尝试使用 Python 脚本向代理发送消息.这在我使用 Python3 shell 并调用生产者方法时有效,但是当我将这些相同的命令放入 python 文件并执行它时 - 脚本似乎挂起.

I have a running and tested Kafka cluster, and am trying to use a Python script to send messages to the brokers. This works when I use the Python3 shell and call the producer method, however when I put these same commands into a python file and execute it - the script seems to hang.

我正在为消费者和生产者使用 kafka-python 库.当我使用 Python3 shell 时,我可以看到使用 Kafka GUI 工具 2.0.4 的主题中出现的消息我在 python 代码中尝试了各种循环和语句,但似乎没有什么让它运行"完成.

I am using the kafka-python library for the consumer and producer. When I use the Python3 shell I can see the messages appear in the topic using Kafka GUI tool 2.0.4 I've tried various loops and statements in the python code, but nothing seems to make it 'run' to completion.

>>>from kafka import KafkaProducer
>>>producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
>>>producer.send('MyTopic', b'Has this worked?')
>>>>>><kafka.producer.future.FutureRecordMetadata object at 0x7f7af9ece048>

这是有效的,字节出现在代理主题数据中.

And this works and bytes appears in the broker topic data.

当我将与上述相同的代码放在 python .py 文件中并使用 Python3 执行时,它会完成,但没有数据发送到 Kafka 代理.也没有显示错误.

When I put the same code as above in a python .py file and execute with Python3 it completes, but no data is sent to Kafka broker. No error shown either.

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
producer.send('MyTopic', b'Some Data to Check')

推荐答案

如你所见,它返回一个未来.

As you can see, it returns a future.

Kafka 客户端将批量记录,他们不会一次立即发送一条记录,要做到这一点,您需要等待或刷新生产者缓冲区,以便它在应用程序退出之前发送.换句话说,交互式终端将生产者数据保存在内存中,在后台运行,反之则丢弃该数据

Kafka clients will batch records, they don't immeadiately send one record at a time, and to make it do that, you will need to wait or flush the producer buffer so that it'll send before the app exits. In other words, the interactive terminal keeps the producer data in-memory, running in the background, and the other way discards that data

作为文档,展示

future = producer.send(...)

try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

或者直接放producer.flush(),如果你不关心元数据或者抓未来.

Or just put producer.flush(), if you don't care about the metadata or grabbing the future.

这篇关于Python Producer 可以通过 shell 发送,但不能通过 .py的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆