KafkaStreams 同一应用程序中的多个流 [英] KafkaStreams multiple streams in same application

查看:41
本文介绍了KafkaStreams 同一应用程序中的多个流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试根据 KafkaStreams 的惯例和合理性做出实用的设计决策.

假设我有两个不同的事件要放入 KTable 中.我有一个生产者将这些消息发送到正在侦听该主题的 KStream.

据我所知,我不能对使用 KafkaStreams 的消息使用条件转发,所以如果流订阅了许多主题(例如,上述每个消息一个),我只能调用stream.to 在单个接收器主题上 - 否则,我将不得不在流上调用 foreach 并使用 KProducer 发送消息到接收器主题.

以上建议使用单个流.我以为我可以在同一个应用程序中设置多个流,每个流监听一个主题,映射并转发到一个表接收器,但是每次我尝试创建 KafkaStreams 的两个实例时,只有第一个初始化的订阅到其主题 - 另一个收到来自客户端的警告,表明其主题没有订阅.

我可以在同一个应用程序中设置多个流吗?如果有,有什么特殊要求吗?

 class Stream(topic: String) {val props: Option[Map[String, String]] = Some(TopicProps.get(topic))val 流生成器 = 新流生成器val 主题 = 新的 util.ArrayList[String]主题.添加(道具.get(主题"))val 流:KStream[String, String] = configureStream(streamsBuilder, topic, props.get(sink"))def configureStream(builder: StreamsBuilder, 主题: java.util.List[String], sink: String): KStream[String, String] = {builder.stream[字符串,字符串](话题,已消耗.`with`(String(), String()))}def init(): KafkaStreams = {val 流 = 新 KafkaStreams(streamsBuilder.build(), KafkaConfig.streamConfig)流开始()流}}类流(){val eventStream = new Stream("first_event")//看起来不错!val eventStream2 = new Stream("second_event")//没有订阅者//如果我切换其中的另一个,则订阅 eventStream2 并且 eventStream 已死在水中val 流:KafkaStreams = eventStream.init()val流2:KafkaStreams = eventStream2.init()}

流配置

 val streamConfig: Properties = {val 属性 = 新属性()properties.put(StreamsConfig.APPLICATION_ID_CONFIG,流应用")properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerHost)特性}

我也喜欢任何建议的替代方案

解决方案

据我所知,我不能对消息使用条件转发

你知道KStream#split() (KStream#branch() 顺序版本)?与条件转发基本相同.

<块引用>

我以为我可以在同一个应用程序中设置多个流,每个流听一个主题,映射并转发到一个表接收器,

这应该如下工作:

StreamsBuilder builder = new SteamsBuilder();KStream stream1 = builder.stream(topic1");KStream stream2 = builder.stream(topic2");stream1.to(table1-topic");stream2.to(table2-topic");

<块引用>

但每次我尝试创建两个 KafkaStreams 实例时,只有第一个初始化订阅其主题 - 另一个从客户端收到警告,表明其主题没有订阅.

不确定.这应该有效.也许你可以分享你的代码?

I'm trying to make a practical design decision based on convention and plausibility with KafkaStreams.

Let's say that I have two different events that I want to place into KTables. I have a producer sending these messages to a KStream that is listening to that topic.

From what I can tell I cannot use conditional forwarding for messages using KafkaStreams, so if the stream is subscribe to many topics (one for each of the above messages, for example) I can only call stream.to on a single sink topic - otherwise, I would have to do something like call foreach on the stream and send messages with a KProducer to the sink topics.

The above suggests using a single stream. I thought I could set up multiple streams in the same app, each listening to a topic, mapping and forwarding to a table sink, but everytime I try to create two instances of KafkaStreams, only the first initialized subscribes to its topic - the other gets a warning from the client that its topic has no subscriptions.

Can I set up multiple streams in the same app? If so, are there any special requirements?

    class Stream(topic: String) {
      val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
      val streamsBuilder = new StreamsBuilder
      val topics = new util.ArrayList[String]
      topics.add(props.get("topic"))

      val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))

      def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
        builder.stream[String, String](
          topics,
          Consumed.`with`(String(), String())
        )
      }

      def init(): KafkaStreams = {
        val streams = new KafkaStreams(streamsBuilder.build(), KafkaConfig.streamConfig)

        streams.start()

        streams
      }
    }

    class Streams() {

      val eventStream = new Stream("first_event") //looking good!
      val eventStream2 = new Stream("second_event") // no subscribers
      //if I switch the other of these, eventStream2 is subscribed to and eventStream is dead in the water
      val streams: KafkaStreams = eventStream.init()
      val streams2: KafkaStreams = eventStream2.init()

    }

stream config

    val streamConfig: Properties = {
        val properties = new Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerHost)
        properties
    }

I'd also love any alternatives suggested

解决方案

From what I can tell I cannot use conditional forwarding for messages

Do you know about KStream#split() (KStream#branch() in order versions)? It basically the same as conditional forwarding.

I thought I could set up multiple streams in the same app, each listening to a topic, mapping and forwarding to a table sink,

This should work as follows:

StreamsBuilder builder = new SteamsBuilder();
KStream stream1 = builder.stream("topic1");
KStream stream2 = builder.stream("topic2");

stream1.to("table1-topic");
stream2.to("table2-topic");

but everytime I try to create two instances of KafkaStreams, only the first initialized subscribes to its topic - the other gets a warning from the client that its topic has no subscriptions.

Not sure. This should work. Maybe you can share your code?

这篇关于KafkaStreams 同一应用程序中的多个流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆