仅读取来自kafka主题的特定消息 [英] reading only specific messages from kafka topic

查看:112
本文介绍了仅读取来自kafka主题的特定消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

场景:

我正在将数据JSON对象数据写入kafka主题,同时阅读,我想根据消息中存在的值读取一组特定的消息.我正在使用kafka-python库.

I am writing data JSON object data into kafka topic while reading I want to read an only specific set of messages based on the value present in the message. I am using kafka-python library.

示例消息:

{flow_status: "completed", value: 1, active: yes}
{flow_status:"failure",value 2, active:yes}

在这里,我只想阅读flow_Status完成的消息.

Here I want to read only messages having flow_Status as completed.

推荐答案

在Kafka中,不可能做这样的事情. 使用者从最新提交的偏移量开始(或者从头开始,或以特定的偏移量进行查找),一个接一个地消费消息. 根据您的用例,您可能在场景中会有不同的流程:将要执行流程的消息放入一个主题,然后处理该操作的应用程序,然后将结果(完成或失败)写在两个不同的主题中:这样,您就可以将所有工作从失败中分离出来. 另一种方法是使用Kafka Streams应用程序进行过滤,但考虑到它只是一个糖,实际上,Streams应用程序将始终读取所有消息,但允许您轻松过滤消息.

In Kafka it's not possible doing something like that. The consumer consumes messages one by one, one after the other starting from the latest committed offset (or from the beginning, or seeking at a specific offset). Depends on your use case, maybe you could have a different flow in your scenario: the message taking the process to do goes into a topic but then the application which processes the action, then writes the result (completed or failed) in two different topics: in this way you have all completed separated from failed. Another way is to use a Kafka Streams application for doing the filtering but taking into account that it's just a sugar, in reality the streams application will always read all the messages but allowing you to filter messages easily.

这篇关于仅读取来自kafka主题的特定消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆