更改Kafka-streams拓扑(添加重新分区步骤)是否对消息处理保证有任何影响 [英] Does changing the Kafka-streams topology( adding a repartition step) has any effect on message processing guarantee

查看:135
本文介绍了更改Kafka-streams拓扑(添加重新分区步骤)是否对消息处理保证有任何影响的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

比方说,我想对转换A进行配置.此转换使用状态存储来管理某些状态,并且还需要重新分区,这意味着仅在配置后才能进行重新分区.现在,如果我以以下方式(或任何其他组合)运行该应用程序3次(可能还会进行滚动升级):-

Let's say I want to make some transformation 'A' configurable. This transformation manages some state using state-store and also requires repartitioning, which means repartitioning will be done only if configured. Now if I run the application 3 times (may be rolling upgrade as well) in following way (or any other combination) :-

  1. 转换'A'已禁用

  1. Transformation 'A' is disabled

启用了转换"A"

转换'A'已禁用

鉴于所有3次运行都使用相同的Kafka经纪人集群:-

Given that all 3 runs uses the same cluster of Kafka brokers:-

  1. 如果启用了EOS,是否在所有3次运行中都存在EOS保证?

  1. If EOS is enabled, will EOS guarantee exist across all the 3 runs ?

如果未启用EOS,是否有可能导致消息丢失的情况(甚至无法提供至少一次)?

If EOS is not enabled, Is there a case which may cause message loss( Failed to provide even at least once)?

拓扑代码可以更好地理解我要执行的操作:-

The topology code to get better understanding of what I am trying to do:-

KStream<String, Cab> kStream = getStreamsBuilder()
            .stream("topic_a", Consumed.with(keySerde, valueSerde))
            .transformValues(() -> transformer1)
            .transformValues(() -> transformer2, "stateStore_a")
            .flatMapValues(events -> events);

    mayBeEnrichAgain(kStream, keySerde, valueSerde)
            .selectKey((ignored, event) -> event.getAnotherId())
            .through(INTERMEDIATE_TOPIC_2, Produced.with(keySerde, valueSerde)) //this repartitioning will always be there
            .transformValues(() -> transformer3, "stateStore_b")
            .to(txStreamsConfig.getAlertTopic(), Produced.with(keySerde, valueSerde));




private <E extends Cab> KStream<String, E> mayBeEnrichAgain(final KStream<String, E> kStream,
        final Serde<String> keySerde,
        final Serde<E> valueSerde) {

    if(enrichmentEnabled){ //repartitioning is configurable
            return kStream.selectKey((ignored, event) -> event.id())
                    .through(INTERMEDIATE_TOPIC_1, Produced.with(keySerde, valueSerde))
                    .transformValues(enricher1)
                    .transformValues(enricher2);
    }
    else{
            return kStream;
    }
}

推荐答案

您不能简单地更改拓扑而不破坏它.

You cannot simply change the topology without potentially breaking it.

一般来说,插入主题是否会首先中断应用程序就很难说了.

Hard to say in general if inserting the through-topic will break the application in the first place.

如果它没有中断,则在删除主题时,您可能会丢失"数据,因为某些未处理的数据可能仍在该主题中,并且在删除主题之后,拓扑将不会读取这些数据.

If it does not break, you might "loose" data when we remove the topic, as some unprocessed data might still be in this topic and after removing the topic, the topology would not read those data.

通常,如果将应用程序升级到可以更改拓扑结构的新版本,则应彻底重置应用程序或使用新的 application.id .

In general, you should reset an application cleanly or use a new application.id if you upgrade your app to a newer version that changes the structure of the topology.

这篇关于更改Kafka-streams拓扑(添加重新分区步骤)是否对消息处理保证有任何影响的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆