发布到kafka主题的典型示例 [英] Faust example of publishing to a kafka topic

查看:86
本文介绍了发布到kafka主题的典型示例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我很好奇您应该如何表达自己想要在浮士德时代传递给Kafka主题的消息.他们的自述文件中的示例似乎未写入主题:

I'm curious about how you are supposed to express that you want a message delivered to a Kafka topic in faust. The example in their readme doesn't seem to write to a topic:

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()

我希望上面的代码中的 hello.send 可以向该主题发布消息,但似乎没有.

I would expect hello.send in the above code to publish a message to the topic, but it doesn't appear to.

有许多阅读主题的示例,还有许多使用cli推送即席消息的示例.梳理完文档后,我看不到任何明确的示例来发布代码中的主题.我只是发疯了,上面的代码应该可以工作吗?

There are many examples of reading from topics, and many examples of using the cli to push an ad-hoc message. After combing through the docs, I don't see any clear examples of publishing to topics in code. Am I just being crazy and the above code should work?

推荐答案

send()函数是调用以写入主题的正确函数.您甚至可以指定一个特定的分区,就像等效的Java API调用一样.

The send() function is the correct one to call to write to topics. You can even specify a particular partition, just like the equivalent Java API call.

这是 send()方法的引用:

https://faust.阅读thedocs.io/en/latest/reference/faust.topics.html#faust.topics.Topic.send

这篇关于发布到kafka主题的典型示例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆