将消息流式传输到多个主题 [英] Streaming messages to multiple topics

查看:36
本文介绍了将消息流式传输到多个主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个主主题和多个谓词,每个谓词都有一个与之关联的输出主题.我想将每条记录发送到谓词解析为 true 的所有主题.我正在使用 Luwak 来测试记录满足哪些谓词(使用这个库,你评估一个带有谓词列表的文档,它会告诉你哪些匹配 - 即我只调用一次以获得满足的谓词列表).

我正在尝试为此使用 Kafka Streams,但在 KStream 上似乎没有合适的方法(KStream#branch 仅将记录路由到单个主题).

一种可能的方法如下:

来自 master 的流将值映射为具有原始内容和匹配谓词列表的格式流到中间匹配主题对于每个谓词/输出主题来自中间匹配主题的流过滤器匹配谓词列表是否包含谓词 ID"将值映射到原始内容流到相应的输出主题

这样的中间主题虽然看起来笨拙".有什么更好的建议吗?

我正在使用:

  • Kafka v0.10.1.1
  • Luwak v1.4.0

解决方案

您可以简单地将多个过滤器并行应用到同一个 KStream 实例:

KStream stream = ...stream.filter(new MyPredicate1()).to("output-topic-1");stream.filter(new MyPredicate2()).to("output-topic-2");stream.filter(new MyPredicate3()).to("output-topic-3");//... 需要多少就多少

每条记录将被发送到每个谓词一次——它在概念上是对所有过滤器的广播,但记录不会被物理复制,因此没有内存开销.

I have a single master topic and multiple predicates each of which has an output topic associated with it. I want to send each record to ALL topics that whose predicate resolves to true. I am using Luwak to test which predicates a record satisfies (to use this library you evaluate a document with a list of predicates and it tells you which ones matched - i.e. I only call it once to get the list of satisfied predicates).

I am trying to use Kafka Streams for this but there doesn't seem to be the appropriate method on KStream (KStream#branch only routes a record to a single topic).

One possible approach is as follows:

Stream from master
Map the values into a format with the original content and the list of matching predicates
Stream to an intermediate with-matches topic

For each predicate/output topic
    Stream from intermediate with-matches topic
    Filter "does list of matches predicates contain predicate ID"
    Map the values to just the original content
    Stream to corresponding output topic

Such an intermediate topic seems "clunky" though. Any better suggestions?

I am using:

  • Kafka v0.10.1.1
  • Luwak v1.4.0

解决方案

You can simple apply multiple filters in parallel to the same KStream instance:

KStream stream = ...

stream.filter(new MyPredicate1()).to("output-topic-1");
stream.filter(new MyPredicate2()).to("output-topic-2");
stream.filter(new MyPredicate3()).to("output-topic-3");
// ... as as many as you need

Each record will be sent to each predicate once -- it's conceptually a broadcast to all filters, but records will not be physically replicated, so there is no memory overhead.

这篇关于将消息流式传输到多个主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆