仅读取来自 kafka 主题的特定消息 [英] reading only specific messages from kafka topic

查看:30
本文介绍了仅读取来自 kafka 主题的特定消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

场景:

我正在将数据 JSON 对象数据写入 kafka 主题,同时阅读我想根据消息中存在的值仅读取一组特定的消息.我正在使用 kafka-python 库.

I am writing data JSON object data into kafka topic while reading I want to read an only specific set of messages based on the value present in the message. I am using kafka-python library.

示例消息:

{flow_status: "completed", value: 1, active: yes}
{flow_status:"failure",value 2, active:yes}

这里我只想读取 flow_Status 为已完成的消息.

Here I want to read only messages having flow_Status as completed.

推荐答案

在 Kafka 中不可能做这样的事情.消费者从最近提交的偏移量开始(或从头开始,或在特定偏移量处寻找)一个接一个地消费消息.取决于您的用例,也许您的场景中可能有不同的流程:处理过程的消息进入一个主题,然后处理操作的应用程序,然后将结果(完成或失败)写入两个不同的主题: 这样你就全部完成了,从失败中分离出来了.另一种方法是使用 Kafka Streams 应用程序进行过滤,但考虑到它只是一个糖,实际上 Streams 应用程序将始终读取所有消息,但允许您轻松过滤消息.

In Kafka it's not possible doing something like that. The consumer consumes messages one by one, one after the other starting from the latest committed offset (or from the beginning, or seeking at a specific offset). Depends on your use case, maybe you could have a different flow in your scenario: the message taking the process to do goes into a topic but then the application which processes the action, then writes the result (completed or failed) in two different topics: in this way you have all completed separated from failed. Another way is to use a Kafka Streams application for doing the filtering but taking into account that it's just a sugar, in reality the streams application will always read all the messages but allowing you to filter messages easily.

这篇关于仅读取来自 kafka 主题的特定消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆