Kafka只订阅最新消息吗? [英] Kafka only subscribe to latest message?

查看:189
本文介绍了Kafka只订阅最新消息吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有时(似乎是非常随机的),Kafka发送旧消息.我只需要最新消息,因此它将使用相同的密钥覆盖消息.目前看来,我有多个具有相同密钥的消息,没有被压缩.

Sometimes(seems very random) Kafka sends old messages. I only want the latest messages so it overwrite messages with the same key. Currently it looks like I have multiple messages with the same key it doesn't get compacted.

我在主题中使用此设置:

I use this setting in the topic:

cleanup.policy=compact

我正在使用Java/Kotlin和Apache Kafka 1.1.1客户端.

I'm using Java/Kotlin and Apache Kafka 1.1.1 client.

Properties(8).apply {
    val jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"
    val jaasCfg = String.format(jaasTemplate, Configuration.kafkaUsername, Configuration.kafkaPassword)
    put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            BOOTSTRAP_SERVERS)
    put(ConsumerConfig.GROUP_ID_CONFIG,
            "ApiKafkaKotlinConsumer${Configuration.kafkaGroupId}")
    put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer::class.java.name)
    put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer::class.java.name)

    put("security.protocol", "SASL_SSL")
    put("sasl.mechanism", "SCRAM-SHA-256")
    put("sasl.jaas.config", jaasCfg)
    put("max.poll.records", 100)
    put("receive.buffer.bytes", 1000000)
}

我错过了一些设置吗?

推荐答案

如果每个键都只有一个值,则必须使用KTable<K,V>抽象: Kafka Streams 中的StreamsBuilder::table(final String topic) .此处使用的主题应将清除策略设置为compact.

If you want have only one value for each key, you have to use KTable<K,V> abstraction: StreamsBuilder::table(final String topic) from Kafka Streams. Topic used here should have cleanup policy set to compact.

如果您使用KafkaConsumer,则只需从代理中提取数据.它没有为您提供执行某种重复数据删除功能的任何机制.根据是否执行压缩,您可以为同一密钥获得 one n 消息.

If you use KafkaConsumer you just pull data from brokers. It doesn't give you any mechanism that perform some kind of deduplication. Depending on if compaction was performed or not, you can get one to n messages for same key.

关于压缩

紧凑并不意味着同一个键的所有旧值都将立即被删除.当相同键的old消息将被删除时,取决于几个属性.最重要的是:

Compaction doesn't mean, that all old value for same key are removed immediately. When old message for same key will be removed, depends on several properties. The most important are:

  • log.cleaner.min.cleanable.ratio

对于符合清除条件的日志,脏日志与总日志的最小比率

The minimum ratio of dirty log to total log for a log to eligible for cleaning

  • log.cleaner.min.compaction.lag.ms
    • log.cleaner.min.compaction.lag.ms
    • 消息在日志中保持不压缩的最短时间.仅适用于正在压缩的日志.

      The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.

      • log.cleaner.enable
        • log.cleaner.enable
        • 启用日志清除器进程以在服务器上运行.如果使用带有cleanup.policy = compact的任何主题(包括内部偏移量主题),则应启用该选项.如果禁用,这些主题将不会被压缩,并且会不断增长.

          Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size.

          有关压缩的更多详细信息,您可以找到 https://kafka.apache.org/documentation/#压实

          More detail about compaction you can find https://kafka.apache.org/documentation/#compaction

          这篇关于Kafka只订阅最新消息吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆