Kafka以相反的顺序消费消息 [英] Kafka consume message in reverse order

查看:36
本文介绍了Kafka以相反的顺序消费消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 Kafka 0.10,我有一个主题 logs,我的 IoT 设备将它们的日志发布到其中,我的消息的关键是 device-id ,所以所有的同一台设备的日志在同一个分区.

I use Kafka 0.10, I have a Topic logs where my IoT devices post their logs into , The key of my messages are the device-id , so all the logs of the same device are in the same partition.

我有一个 api /devices/{id}/tail-logs 需要在调用时显示一个设备的最后 N 个日志.

I have an api /devices/{id}/tail-logs that needs to display the N last logs of one device at the moment the call was made.

目前我以一种非常低效的方式(但有效)实现了它,因为我从包含设备日志的分区的开头(即最旧的日志)开始,直到达到当前时间戳.

Currently I have it implemented in a very unefficient way (but working), as I start from the beginning (i.e oldest logs) of the partition containing the device's log until I reach current timestamp.

更有效的方法是,如果我可以获得当前的最新偏移量,然后向后使用消息(我需要过滤掉一些消息以仅保留我正在寻找的设备的消息)

A more efficient way would be if I could get the current latest offset and then consume the messages backward (I would need to filter out some message to keep only those of the device i'm looking for)

用 kafka 可以做到吗?如果不是如何解决这个问题?(我会看到一个更重的解决方案是将 kafka-connect 链接到弹性搜索,然后查询弹性搜索,但为此再添加 2 个组件似乎有点矫枉过正......)

Is it possible to do it with kafka ? If not how one can solve this problematic ? (a more heavy solution I would see would be to have a kafka-connect linked to an elastic search and then to query the elasticsearch but to have 2 more components for this seems a bit overkill...)

推荐答案

由于您使用的是 0.10.2,我建议您编写一个 Kafka Streams 应用程序.应用程序将是有状态的,状态将保存每个 device-id 的最后 N 个记录/日志——如果新数据写入输入主题,Kafka Streams 应用程序将只更新它的状态(没有需要重新阅读整个主题).

As you are on 0.10.2, I would recommend to write a Kafka Streams application. The application will be stateful and the state will hold the last N records/logs per device-id -- if new data is written to the input topic, the Kafka Streams application will just update it's state (without the need to re-read the whole topic).

此外,该应用程序还为您提供请求(api /devices/{id}/tail-logs"使用 交互式查询 功能.

Furthermore, the application also serves you request ("api /devices/{id}/tail-logs" using Interactive Queries feature.

因此,我不会构建一个必须为每个请求重新计算答案的无状态应用程序,而是构建一个有状态的应用程序,它会急切地计算所有可能请求的结果(并始终自动更新结果)(即,对于所有 device-ids) 并在请求进来时仅返回已经计算的结果.

Thus, I would not build a stateless application that has to recompute the answer for each request, but build a stateful application that eagerly compute the result (and update the result automatically all the time) for all possible requests (ie, for all device-ids) and just returns the already computed result when a request comes in.

这篇关于Kafka以相反的顺序消费消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆