Kafka Streams:使用相同的application.id从多个主题中消费 [英] Kafka Streams: use the same `application.id` to consume from multiple topics

查看:294
本文介绍了Kafka Streams:使用相同的application.id从多个主题中消费的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个需要听多个不同主题的应用程序;每个主题对于如何处理消息都有各自独立的逻辑.我曾考虑过为每个KafkaStreams实例使用相同的kafka属性,但是却收到类似以下错误.

I have an application that needs to listen to multiple different topics; each topic has separate logic for how the messages are handled. I had thought to use the same kafka properties for each KafkaStreams instance, but I get an error like the one below.

错误

java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic

代码(kotlin)

class KafkaSetup() {
    companion object {
        private val LOG = LoggerFactory.getLogger(this::class.java)
    }

    fun getProperties(): Properties {
        val properties = Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
        return properties
    }

    private fun listenOnMyTopic() {
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")

        kStream.foreach { key, value -> LOG.info("do stuff") }

        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    }

    private fun listenOnMyOtherTopic() {
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")

        kStream.foreach { key, value -> LOG.info("do other stuff") }

        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    }
}

我发现了这个参考,表明您不能将application.id用于多个主题,但是我发现很难找到支持该主题的参考文档. application.id状态的文档:

I found this reference that suggest that you can not use application.id for multiple topics, however I am finding it hard to find reference documentation to support that. The documentation for application.id states:

流处理应用程序的标识符.在Kafka集群中必须唯一.用作1)默认的客户端ID前缀,2)成员资格管理的组ID,3)更改日志主题前缀.

An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.

问题

  1. 此错误是什么意思,以及引起该错误的原因.
  2. 鉴于您可以让应用程序的多个实例以相同的ID运行,以从多个主题分区中使用,在Kafka集群中必须唯一" 是什么意思?
  3. 您可以使用相同的Kafka流application.id来启动两个列在不同主题上的KafkaStreams吗?如果可以,怎么办?
  1. What does this error mean, and what causes it.
  2. Given that you can have multiple instance of you app running with the same id to consume from multiple topic partitions, what does "Must be unique within the Kafka cluster" mean?
  3. Can you use the same Kafka streams application.id to start two KafkaStreams that are listing on different topics? and if so, how?

详细信息: kafka 0.11.0.2

Details: kafka 0.11.0.2

推荐答案

Kafka Streams通过分区而不是主题进行缩放.因此,如果您使用相同的application.id启动多个应用程序,则它们在订阅的输入主题及其处理逻辑方面必须相同.该应用程序使用application.id作为group.id形成一个消费者组,因此将输入主题的不同分区分配给了不同的实例.

Kafka Streams scales via partitions, not topics. Thus, if you start multiple application with the same application.id they must be identical with regard to input topic they subscribe to and their processing logic. The application forms a consumer-group using the application.id as group.id and thus different partitions of the input topic(s) are assigned to different instances.

如果您使用 相同 逻辑使用其他主题,则可以一次订阅 all 主题(在每种情况下,您都可以启动).扩展仍然基于分区. (基本上,这是您的输入主题的合并".)

If you have different topic with the same logic, you can subscribe to all topic at once (in each instance you start). Scaling is still based on partitions though. (It's basically a "merge" of your input topics.)

如果要按主题进行缩放和/或具有不同的处理逻辑,则必须对不同的Kafka Streams应用程序使用不同的application.id.

If you want to scale via topics and/or have different processing logic, you must use different application.id for the different Kafka Streams applications.

这篇关于Kafka Streams:使用相同的application.id从多个主题中消费的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆