Kafka Consumer-具有较高优先级的主题 [英] Kafka Consumer - topic(s) with higher priority

查看:1150
本文介绍了Kafka Consumer-具有较高优先级的主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Kafka Consumer来阅读多个主题,并且我需要其中一个具有更高的优先级.处理需要花费很多时间,并且(低优先级)主题中总是有很多消息,但是我需要尽快处理来自其他消息的消息.

I am using Kafka Consumer to read from several topics and I need one of those to have higher priority. The processing takes a lot of time and there are always many messages in (low priority) topics, but I need the messages from other one to be processed as soon as possible.

这与> Kafka是否支持主题或消息的优先级类似? ,但这是使用旧的API.

It's similar question as Does Kafka support priority for topic or message? but this one is using old API.

在新的API(0.10.1.1)中,有方法

In new API (0.10.1.1), there are methods

KafkaConsumer::pause(Collection)
KafkaConsumer::resume(Collection)

但是我不清楚,如何有效地检测到高优先级主题中有新消息,并且有必要暂停其他主题中的消费.

But it's not clear to me, how to effectively detect that there are new messages in high priority topic and it is necessary to pause consumption from the other topics.

有什么想法/例子吗?

推荐答案

最后,按照dawsaw的建议,我解决了这一问题-在处理循环中,我存储了从中读取的所有主题/分区:

Finally I solved that, as dawsaw advised - in processing loop, I store for all topics/partitions I read from:

  • 开始偏移
  • endOffsets
  • 已提交-我无法使用位置,因为我订阅主题,而不是分区.

每当对任何优先主题使用(endOffset - commited) > 0时,我就对非优先主题调用consumer.pause(),然后对所有优先主题在(endOffset - commited) == 0之后再次进行恢复.

Whenever (endOffset - commited) > 0 for any priority topic, I call consumer.pause() for non priority topics and resume those again after (endOffset - commited) == 0 for all priority topics.

这篇关于Kafka Consumer-具有较高优先级的主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆