Python Producer可以通过外壳发送,但不能通过.py发送 [英] Python Producer can send via shell, but not .py

查看:209
本文介绍了Python Producer可以通过外壳发送,但不能通过.py发送的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个正在运行且经过测试的Kafka集群,并且正在尝试使用Python脚本将消息发送给代理.当我使用Python3 shell并调用producer方法时,此方法有效,但是当我将这些相同的命令放入python文件并执行时-脚本似乎挂起.

I have a running and tested Kafka cluster, and am trying to use a Python script to send messages to the brokers. This works when I use the Python3 shell and call the producer method, however when I put these same commands into a python file and execute it - the script seems to hang.

我正在为消费者和生产者使用kafka-python库.当我使用Python3 Shell时,可以看到使用Kafka GUI工具2.0.4出现在主题中的消息 我已经在python代码中尝试了各种循环和语句,但是似乎没有什么可以使其运行"到完成.

I am using the kafka-python library for the consumer and producer. When I use the Python3 shell I can see the messages appear in the topic using Kafka GUI tool 2.0.4 I've tried various loops and statements in the python code, but nothing seems to make it 'run' to completion.

>>>from kafka import KafkaProducer
>>>producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
>>>producer.send('MyTopic', b'Has this worked?')
>>>>>><kafka.producer.future.FutureRecordMetadata object at 0x7f7af9ece048>

这有效并且字节出现在代理主题数据中.

And this works and bytes appears in the broker topic data.

当我在python .py文件中放入与上面相同的代码并使用Python3执行时,它完成了,但是没有数据发送到Kafka代理. 也没有显示错误.

When I put the same code as above in a python .py file and execute with Python3 it completes, but no data is sent to Kafka broker. No error shown either.

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
producer.send('MyTopic', b'Some Data to Check')

推荐答案

如您所见,它返回了未来.

As you can see, it returns a future.

Kafka客户端将批处理记录,它们不会一次立即发送一条记录,要做到这一点,您需要等待或刷新生产者缓冲区,以便在应用程序退出之前发送.换句话说,交互式终端将生产者数据保留在内存中,并在后台运行,而另一种方式则丢弃该数据

Kafka clients will batch records, they don't immeadiately send one record at a time, and to make it do that, you will need to wait or flush the producer buffer so that it'll send before the app exits. In other words, the interactive terminal keeps the producer data in-memory, running in the background, and the other way discards that data

作为文档,请显示

future = producer.send(...)

try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

或者,如果您不关心元数据或抓住未来,请放下producer.flush().

Or just put producer.flush(), if you don't care about the metadata or grabbing the future.

这篇关于Python Producer可以通过外壳发送,但不能通过.py发送的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆