更改 Kafka-streams 拓扑(添加重新分区步骤)是否对消息处理保证有任何影响 [英] Does changing the Kafka-streams topology( adding a repartition step) has any effect on message processing guarantee

查看:21
本文介绍了更改 Kafka-streams 拓扑(添加重新分区步骤)是否对消息处理保证有任何影响的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我想进行一些可配置的转换A".此转换使用 state-store 管理一些状态,并且还需要重新分区,这意味着只有在配置后才会进行重新分区.现在,如果我以下列方式(或任何其他组合)运行该应用程序 3 次(也可能是滚动升级):-

Let's say I want to make some transformation 'A' configurable. This transformation manages some state using state-store and also requires repartitioning, which means repartitioning will be done only if configured. Now if I run the application 3 times (may be rolling upgrade as well) in following way (or any other combination) :-

  1. 转换A"被禁用

  1. Transformation 'A' is disabled

转换A"已启用

转换A"被禁用

鉴于所有 3 次运行都使用相同的 Kafka 代理集群:-

Given that all 3 runs uses the same cluster of Kafka brokers:-

  1. 如果启用了 EOS,那么 3 次运行是否都存在 EOS 保证?

  1. If EOS is enabled, will EOS guarantee exist across all the 3 runs ?

如果没有启用EOS,有没有可能导致消息丢失的情况(至少一次都没有提供)?

If EOS is not enabled, Is there a case which may cause message loss( Failed to provide even at least once)?

用于更好地理解我要执行的操作的拓扑代码:-

The topology code to get better understanding of what I am trying to do:-

KStream<String, Cab> kStream = getStreamsBuilder()
            .stream("topic_a", Consumed.with(keySerde, valueSerde))
            .transformValues(() -> transformer1)
            .transformValues(() -> transformer2, "stateStore_a")
            .flatMapValues(events -> events);

    mayBeEnrichAgain(kStream, keySerde, valueSerde)
            .selectKey((ignored, event) -> event.getAnotherId())
            .through(INTERMEDIATE_TOPIC_2, Produced.with(keySerde, valueSerde)) //this repartitioning will always be there
            .transformValues(() -> transformer3, "stateStore_b")
            .to(txStreamsConfig.getAlertTopic(), Produced.with(keySerde, valueSerde));




private <E extends Cab> KStream<String, E> mayBeEnrichAgain(final KStream<String, E> kStream,
        final Serde<String> keySerde,
        final Serde<E> valueSerde) {

    if(enrichmentEnabled){ //repartitioning is configurable
            return kStream.selectKey((ignored, event) -> event.id())
                    .through(INTERMEDIATE_TOPIC_1, Produced.with(keySerde, valueSerde))
                    .transformValues(enricher1)
                    .transformValues(enricher2);
    }
    else{
            return kStream;
    }
}

推荐答案

你不能简单地改变拓扑而不可能破坏它.

You cannot simply change the topology without potentially breaking it.

一般很难说插入贯穿主题是否会首先破坏应用程序.

Hard to say in general if inserting the through-topic will break the application in the first place.

如果它没有中断,当我们删除主题时,您可能会丢失"数据,因为一些未处理的数据可能仍在此主题中,并且在删除主题后,拓扑不会读取这些数据.

If it does not break, you might "loose" data when we remove the topic, as some unprocessed data might still be in this topic and after removing the topic, the topology would not read those data.

通常,如果您将应用升级到更改拓扑结构的较新版本,您应该彻底重置应用或使用新的 application.id.

In general, you should reset an application cleanly or use a new application.id if you upgrade your app to a newer version that changes the structure of the topology.

这篇关于更改 Kafka-streams 拓扑(添加重新分区步骤)是否对消息处理保证有任何影响的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆