Kafka Streams:使用相同的“application.id"从多个主题中消费 [英] Kafka Streams: use the same `application.id` to consume from multiple topics

查看:24
本文介绍了Kafka Streams:使用相同的“application.id"从多个主题中消费的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个应用程序需要收听多个不同的主题;每个主题都有关于如何处理消息的单独逻辑.我曾想过为每个 KafkaStreams 实例使用相同的 kafka 属性,但出现如下错误.

错误

java.lang.IllegalArgumentException:为非订阅主题正则表达式模式分配了分区 my-topic-1;订阅模式是我的其他主题

代码(kotlin)

class KafkaSetup() {伴生对象{私有 val LOG = LoggerFactory.getLogger(this::class.java)}有趣的 getProperties(): 属性 {val 属性 = 属性()properties.put(StreamsConfig.APPLICATION_ID_CONFIG,我的应用程序")返回属性}私人乐趣 listenOnMyTopic() {val kStreamBuilder = KStreamBuilder()val kStream: KStream= kStreamBuilder.stream("我的主题")kStream.foreach { 键,值 ->LOG.info("做事") }val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())kafkaStreams.start()}私人乐趣 listenOnMyOtherTopic() {val kStreamBuilder = KStreamBuilder()val kStream: KStream= kStreamBuilder.stream("my-other-topic")kStream.foreach { 键,值 ->LOG.info("做其他事情") }val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())kafkaStreams.start()}}

我发现这个 reference 表明您不能将 application.id 用于多个主题,但是我发现很难找到支持它的参考文档.application.id文档指出:><块引用>

流处理应用程序的标识符.在 Kafka 集群中必须是唯一的.它用作 1) 默认客户端 ID 前缀,2) 用于成员资格管理的组 ID,3) 变更日志主题前缀.

问题

  1. 这个错误是什么意思,是什么原因造成的.
  2. 鉴于您可以使用相同 ID 运行多个应用实例以从多个主题分区使用,在 Kafka 集群中必须是唯一的" 是什么意思?
  3. 您能否使用相同的 Kafka 流 application.id 来启动两个列出不同主题的 KafkaStreams?如果是这样,如何?

详细信息:kafka 0.11.0.2

解决方案

Kafka Streams 通过分区而不是主题进行扩展.因此,如果您使用相同的 application.id 启动多个应用程序,则它们在订阅的输入主题和处理逻辑方面必须相同.应用程序使用 application.id 作为 group.id 形成一个消费者组,因此输入主题的不同分区被分配给不同的实例.

如果您有具有相同逻辑的不同主题,您可以一次订阅所有主题(在您开始的每个实例中).不过,缩放仍然基于分区.(这基本上是您输入主题的合并".)

如果你想通过主题扩展和/或有不同的处理逻辑,你必须为不同的 Kafka Streams 应用程序使用不同的 application.id.

I have an application that needs to listen to multiple different topics; each topic has separate logic for how the messages are handled. I had thought to use the same kafka properties for each KafkaStreams instance, but I get an error like the one below.

Error

java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic

Code (kotlin)

class KafkaSetup() {
    companion object {
        private val LOG = LoggerFactory.getLogger(this::class.java)
    }

    fun getProperties(): Properties {
        val properties = Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
        return properties
    }

    private fun listenOnMyTopic() {
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")

        kStream.foreach { key, value -> LOG.info("do stuff") }

        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    }

    private fun listenOnMyOtherTopic() {
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")

        kStream.foreach { key, value -> LOG.info("do other stuff") }

        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    }
}

I found this reference that suggest that you can not use application.id for multiple topics, however I am finding it hard to find reference documentation to support that. The documentation for application.id states:

An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.

Questions

  1. What does this error mean, and what causes it.
  2. Given that you can have multiple instance of you app running with the same id to consume from multiple topic partitions, what does "Must be unique within the Kafka cluster" mean?
  3. Can you use the same Kafka streams application.id to start two KafkaStreams that are listing on different topics? and if so, how?

Details: kafka 0.11.0.2

解决方案

Kafka Streams scales via partitions, not topics. Thus, if you start multiple application with the same application.id they must be identical with regard to input topic they subscribe to and their processing logic. The application forms a consumer-group using the application.id as group.id and thus different partitions of the input topic(s) are assigned to different instances.

If you have different topic with the same logic, you can subscribe to all topic at once (in each instance you start). Scaling is still based on partitions though. (It's basically a "merge" of your input topics.)

If you want to scale via topics and/or have different processing logic, you must use different application.id for the different Kafka Streams applications.

这篇关于Kafka Streams:使用相同的“application.id"从多个主题中消费的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆