Kafka 是否支持主题或消息的优先级? [英] Does Kafka support priority for topic or message?

查看:69
本文介绍了Kafka 是否支持主题或消息的优先级?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在探索 Kafka 是否支持优先处理任何队列或消息的事实.

它似乎不支持任何这样的东西.我用谷歌搜索并找到了这个邮件档案,它也支持这个:%3referl.comhttp://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201206.mbox/%3CCAOeJiJhVHsr=d6aSTihPsqWVg6vK5xYLam6yMDcd6UAUoXf-DQ@mail.gmail.com%3E

这里有人配置了 Kafka 来优先处理任何主题或消息吗?

解决方案

Kafka 是一种快速、可扩展、分布式的设计、分区和复制提交日志服务.因此没有主题或消息的优先级.

>

我也遇到了和你一样的问题.解决方法很简单.在 kafka 队列中创建主题,比如说:

  1. high_priority_queue

  2. medium_priority_queue

  3. low_priority_queue

在 high_priority_queue 中发布高优先级消息,在 medium_priority_queue 中发布中优先级消息.

现在您可以为所有主题创建 kafka 使用者并打开流.

//这是 Scala 代码val props = new Properties()props.put("group.id", groupId)props.put("zookeeper.connect", zookeeperConnect)val config = new ConsumerConfig(props)val 连接器 = Consumer.create(config)val topicWithStreamCount = Map(high_priority_queue"->1、medium_priority_queue"->1、low_priority_queue"->1)val streamsMap = connector.createMessageStreams(topicWithStreamCount)

您获得每个主题的流.现在您可以先阅读 high_priority 主题,如果主题没有任何消息,然后回退到 medium_priority_queue 主题.如果 medium_priority_queue 为空,则读取 low_priority 队列.

这个技巧对我来说很好用.可能对你有帮助!!

I was exploring the fact whether Kafka supports priority for any queue or message to process.

It seems it doesn't support any such thing. I googled and found this mail archive which supports this also: http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201206.mbox/%3CCAOeJiJhVHsr=d6aSTihPsqWVg6vK5xYLam6yMDcd6UAUoXf-DQ@mail.gmail.com%3E

Does anyone here configured of Kafka to prioritize any topic or message?

解决方案

Kafka is a fast, scalable, distributed in nature by its design, partitioned and replicated commit log service.So there is no priority on topic or message.

I also faced same problem that you have.Solution is very simple.Create topics in kafka queue,Let say:

  1. high_priority_queue

  2. medium_priority_queue

  3. low_priority_queue

Publish high priority message in high_priority_queue and medium priority message in medium_priority_queue.

Now you can create kafka consumer and open stream for all topic.

  // this is scala code 
  val props = new Properties()
  props.put("group.id", groupId)
  props.put("zookeeper.connect", zookeeperConnect)
  val config = new ConsumerConfig(props)
  val connector = Consumer.create(config)
  val topicWithStreamCount = Map(
       "high_priority_queue" -> 1,
       "medium_priority_queue" ->  1, 
       "low_priority_queue" -> 1
  )
  val streamsMap = connector.createMessageStreams(topicWithStreamCount)

You get stream of each topic.Now you can first read high_priority topic if topic does not have any message then fallback on medium_priority_queue topic. if medium_priority_queue is empty then read low_priority queue.

This trick is working fine for me.May be helpful for you!!.

这篇关于Kafka 是否支持主题或消息的优先级?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆