KafkaStreams在同一应用程序中多个流 [英] KafkaStreams multiple streams in same application
问题描述
我正在尝试根据惯例和KafkaStreams的合理性做出切实可行的设计决策.
I'm trying to make a practical design decision based on convention and plausibility with KafkaStreams.
假设我有两个不同的事件要放入 KTable
中.我有一个生产者,将这些消息发送到正在监听该主题的 KStream
中.
Let's say that I have two different events that I want to place into KTable
s. I have a producer sending these messages to a KStream
that is listening to that topic.
据我所知,我不能对使用 KafkaStreams
的消息使用条件转发,因此,如果流订阅了许多主题(例如,上述消息中的每一个),则我只能调用在单个接收器主题上使用 stream.to
-否则,我将不得不在流上执行诸如调用 foreach
的操作,并使用 KProducer
发送消息接收器主题.
From what I can tell I cannot use conditional forwarding for messages using KafkaStreams
, so if the stream is subscribe to many topics (one for each of the above messages, for example) I can only call stream.to
on a single sink topic - otherwise, I would have to do something like call foreach
on the stream and send messages with a KProducer
to the sink topics.
以上建议使用单个流.我以为可以在同一个应用程序中设置多个流,每个流都监听一个主题,映射并转发到表接收器,但是每次我尝试创建两个 KafkaStreams
实例时,只有第一个初始化的订阅到其主题-另一个从客户端收到警告,其主题没有订阅.
The above suggests using a single stream. I thought I could set up multiple streams in the same app, each listening to a topic, mapping and forwarding to a table sink, but everytime I try to create two instances of KafkaStreams
, only the first initialized subscribes to its topic - the other gets a warning from the client that its topic has no subscriptions.
我可以在同一个应用中设置多个流吗?如果是这样,有什么特殊要求吗?
Can I set up multiple streams in the same app? If so, are there any special requirements?
class Stream(topic: String) {
val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
val streamsBuilder = new StreamsBuilder
val topics = new util.ArrayList[String]
topics.add(props.get("topic"))
val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))
def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
builder.stream[String, String](
topics,
Consumed.`with`(String(), String())
)
}
def init(): KafkaStreams = {
val streams = new KafkaStreams(streamsBuilder.build(), KafkaConfig.streamConfig)
streams.start()
streams
}
}
class Streams() {
val eventStream = new Stream("first_event") //looking good!
val eventStream2 = new Stream("second_event") // no subscribers
//if I switch the other of these, eventStream2 is subscribed to and eventStream is dead in the water
val streams: KafkaStreams = eventStream.init()
val streams2: KafkaStreams = eventStream2.init()
}
流配置
val streamConfig: Properties = {
val properties = new Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerHost)
properties
}
我也很喜欢建议的其他选择
I'd also love any alternatives suggested
推荐答案
据我所知,我不能对消息使用条件转发
From what I can tell I cannot use conditional forwarding for messages
您了解 KStream#branch()
吗?基本上与条件转发相同.
Do you know about KStream#branch()
? It basically the same as conditional forwarding.
我以为我可以在同一个应用中设置多个流,每个流都监听一个主题,映射并转发到表接收器,
I thought I could set up multiple streams in the same app, each listening to a topic, mapping and forwarding to a table sink,
这应该如下工作:
StreamsBuilder builder = new SteamsBuilder();
KStream stream1 = builder.stream("topic1");
KStream stream2 = builder.stream("topic2");
stream1.to("table1-topic");
stream2.to("table2-topic");
但是每次我尝试创建两个KafkaStreams实例时,只有第一个初始化的实例订阅了它的主题-另一个则从客户端收到一条警告,表明其主题没有订阅.
but everytime I try to create two instances of KafkaStreams, only the first initialized subscribes to its topic - the other gets a warning from the client that its topic has no subscriptions.
不确定.这应该工作.也许您可以共享您的代码?
Not sure. This should work. Maybe you can share your code?
这篇关于KafkaStreams在同一应用程序中多个流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!