Kafka只订阅最新消息? [英] Kafka only subscribe to latest message?

查看:50
本文介绍了Kafka只订阅最新消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有时(似乎很随机)Kafka 会发送旧消息.我只想要最新的消息,所以它会用相同的密钥覆盖消息.目前看起来我有多个消息使用相同的密钥,它没有被压缩.

Sometimes(seems very random) Kafka sends old messages. I only want the latest messages so it overwrite messages with the same key. Currently it looks like I have multiple messages with the same key it doesn't get compacted.

我在主题中使用此设置:

I use this setting in the topic:

cleanup.policy=compact

我使用的是 Java/Kotlin 和 Apache Kafka 1.1.1 客户端.

I'm using Java/Kotlin and Apache Kafka 1.1.1 client.

Properties(8).apply {
    val jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"
    val jaasCfg = String.format(jaasTemplate, Configuration.kafkaUsername, Configuration.kafkaPassword)
    put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            BOOTSTRAP_SERVERS)
    put(ConsumerConfig.GROUP_ID_CONFIG,
            "ApiKafkaKotlinConsumer${Configuration.kafkaGroupId}")
    put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer::class.java.name)
    put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer::class.java.name)

    put("security.protocol", "SASL_SSL")
    put("sasl.mechanism", "SCRAM-SHA-256")
    put("sasl.jaas.config", jaasCfg)
    put("max.poll.records", 100)
    put("receive.buffer.bytes", 1000000)
}

我是否遗漏了一些设置?

Have I missed some settings?

推荐答案

如果你希望每个键只有一个值,你必须使用 KTable 抽象:StreamsBuilder::table(final String topic) 来自 Kafka Streams.此处使用的主题应将清理策略设置为 compact.

If you want have only one value for each key, you have to use KTable<K,V> abstraction: StreamsBuilder::table(final String topic) from Kafka Streams. Topic used here should have cleanup policy set to compact.

如果您使用 KafkaConsumer,您只需从代理中提取数据.它没有为您提供任何执行某种重复数据删除的机制.根据是否执行了压缩,您可以获得 onen 条相同密钥的消息.

If you use KafkaConsumer you just pull data from brokers. It doesn't give you any mechanism that perform some kind of deduplication. Depending on if compaction was performed or not, you can get one to n messages for same key.

关于压缩

压缩并不意味着立即删除同一键的所有旧值.何时删除相同密钥的 old 消息,取决于几个属性.最重要的是:

Compaction doesn't mean, that all old value for same key are removed immediately. When old message for same key will be removed, depends on several properties. The most important are:

  • log.cleaner.min.cleanable.ratio

符合清理条件的日志的脏日志占总日志的最小比率

The minimum ratio of dirty log to total log for a log to eligible for cleaning

  • log.cleaner.min.compaction.lag.ms
  • 消息在日志中保持未压缩的最短时间.仅适用于正在压缩的日志.

    The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.

    • log.cleaner.enable
    • 启用日志清理进程在服务器上运行.如果使用带有 cleanup.policy=compact 的任何主题,包括内部偏移主题,则应启用.如果禁用,这些主题将不会被压缩并持续增长.

      Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size.

      有关压缩的更多详细信息,您可以找到 https://kafka.apache.org/documentation/#压缩

      More detail about compaction you can find https://kafka.apache.org/documentation/#compaction

      这篇关于Kafka只订阅最新消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆