发布到 kafka 主题的 Faust 示例 [英] Faust example of publishing to a kafka topic

查看:24
本文介绍了发布到 kafka 主题的 Faust 示例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我很好奇您应该如何表达您希望将消息传递到 Kafka 主题的 faust.他们自述文件中的示例似乎没有写入主题:

I'm curious about how you are supposed to express that you want a message delivered to a Kafka topic in faust. The example in their readme doesn't seem to write to a topic:

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()

我希望上面代码中的 hello.send 向主题发布消息,但它似乎没有.

I would expect hello.send in the above code to publish a message to the topic, but it doesn't appear to.

从topic中读取的例子有很多,还有很多使用cli推送ad-hoc消息的例子.梳理文档后,我没有看到任何明确的代码发布主题示例.我只是疯了,上面的代码应该可以工作吗?

There are many examples of reading from topics, and many examples of using the cli to push an ad-hoc message. After combing through the docs, I don't see any clear examples of publishing to topics in code. Am I just being crazy and the above code should work?

推荐答案

send() 函数是为写入主题而调用的正确函数.您甚至可以指定一个特定的分区,就像等效的 Java API 调用一样.

The send() function is the correct one to call to write to topics. You can even specify a particular partition, just like the equivalent Java API call.

这里是 send() 方法的参考:

Here is the reference for the send() method:

https://faust.readthedocs.io/en/latest/reference/faust.topics.html#faust.topics.Topic.send

这篇关于发布到 kafka 主题的 Faust 示例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆