Kafka以相反的顺序使用消息 [英] Kafka consume message in reverse order

查看:75
本文介绍了Kafka以相反的顺序使用消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用Kafka 0.10,我有一个主题 logs ,其中IoT设备将其日志发布到,我的消息的关键是 device-id ,所以所有同一设备的日志位于同一分区中.

I use Kafka 0.10, I have a Topic logs where my IoT devices post their logs into , The key of my messages are the device-id , so all the logs of the same device are in the same partition.

我有一个api /devices/{id}/tail-logs ,需要在拨打电话时显示一个设备的N个最新日志.

I have an api /devices/{id}/tail-logs that needs to display the N last logs of one device at the moment the call was made.

当前,我以一种非常低效的方式(但是有效)来实现它,因为我是从包含设备日志的分区的开头(即最早的日志)开始,直到达到当前时间戳.

Currently I have it implemented in a very unefficient way (but working), as I start from the beginning (i.e oldest logs) of the partition containing the device's log until I reach current timestamp.

一种更有效的方法是,如果我可以获取当前的最新偏移量,然后向后消耗消息(我需要过滤掉一些消息以仅保留我要查找的设备中的消息)

A more efficient way would be if I could get the current latest offset and then consume the messages backward (I would need to filter out some message to keep only those of the device i'm looking for)

有可能用kafka做到吗?如果不是,该如何解决这个问题呢?(我想看到的一个更沉重的解决方案是将kafka-connect链接到弹性搜索,然后查询elasticsearch,但是要有2个以上的组件似乎有点矫...过正...)

Is it possible to do it with kafka ? If not how one can solve this problematic ? (a more heavy solution I would see would be to have a kafka-connect linked to an elastic search and then to query the elasticsearch but to have 2 more components for this seems a bit overkill...)

推荐答案

当您使用0.10.2时,我建议编写一个Kafka Streams应用程序.该应用程序将是有状态的,并且该状态将保存每个 device-id 的最后N条记录/日志-如果将新数据写入输入主题,则Kafka Streams应用程序将仅更新其状态(不包括)需要重新阅读整个主题).

As you are on 0.10.2, I would recommend to write a Kafka Streams application. The application will be stateful and the state will hold the last N records/logs per device-id -- if new data is written to the input topic, the Kafka Streams application will just update it's state (without the need to re-read the whole topic).

此外,该应用程序还可以使用

Furthermore, the application also serves you request ("api /devices/{id}/tail-logs" using Interactive Queries feature.

因此,我不会构建一个无状态应用程序,该应用程序必须重新计算每个请求的答案,而是构建一个有状态的应用程序,它急切地为所有可能的请求(例如,对于所有 device-id s),并仅在请求进入时返回已计算的结果.

Thus, I would not build a stateless application that has to recompute the answer for each request, but build a stateful application that eagerly compute the result (and update the result automatically all the time) for all possible requests (ie, for all device-ids) and just returns the already computed result when a request comes in.

这篇关于Kafka以相反的顺序使用消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆