发布到kafka主题的典型示例 [英] Faust example of publishing to a kafka topic
问题描述
我很好奇您应该如何表达自己想要在浮士德时代传递给Kafka主题的消息.他们的自述文件中的示例似乎未写入主题:
I'm curious about how you are supposed to express that you want a message delivered to a Kafka topic in faust. The example in their readme doesn't seem to write to a topic:
import faust
class Greeting(faust.Record):
from_name: str
to_name: str
app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)
@app.agent(topic)
async def hello(greetings):
async for greeting in greetings:
print(f'Hello from {greeting.from_name} to {greeting.to_name}')
@app.timer(interval=1.0)
async def example_sender(app):
await hello.send(
value=Greeting(from_name='Faust', to_name='you'),
)
if __name__ == '__main__':
app.main()
我希望上面的代码中的 hello.send
可以向该主题发布消息,但似乎没有.
I would expect hello.send
in the above code to publish a message to the topic, but it doesn't appear to.
有许多阅读主题的示例,还有许多使用cli推送即席消息的示例.梳理完文档后,我看不到任何明确的示例来发布代码中的主题.我只是发疯了,上面的代码应该可以工作吗?
There are many examples of reading from topics, and many examples of using the cli to push an ad-hoc message. After combing through the docs, I don't see any clear examples of publishing to topics in code. Am I just being crazy and the above code should work?
推荐答案
send()
函数是调用以写入主题的正确函数.您甚至可以指定一个特定的分区,就像等效的Java API调用一样.
The send()
function is the correct one to call to write to topics. You can even specify a particular partition, just like the equivalent Java API call.
这是 send()
方法的引用:
https://faust.阅读thedocs.io/en/latest/reference/faust.topics.html#faust.topics.Topic.send
这篇关于发布到kafka主题的典型示例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!