使用Kafka作为EventStore时在Flink中恢复状态一致性 [英] Recovering state consistency in Flink when using Kafka as EventStore

查看:413
本文介绍了使用Kafka作为EventStore时在Flink中恢复状态一致性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将微服务实现为事件源集合,然后将其实现为Flink FlatMapFunction.在基本设置中,聚合从两个kafka主题中读取事件和命令.然后,它将新事件写入该第一个主题,并在第三个主题中处理结果.因此,Kafka充当事件存储.希望此图对您有所帮助:

I am implementing a microservice as an event-sourcing aggregate which, in turn, is implemented as a Flink FlatMapFunction. In the basic setup, the aggregate reads events and commands from two kafka topics. Then, it writes new events to that first topic and processing results in a third topic. Therefore, Kafka acts as the event store. Hope this drawing helps:

  RPC Request                              RPC Result
  |                                                 |
  ~~~~> Commands-|              |---> Results ~~~~~~|
                 |-->Aggregate--|
  ~> Input evs. -|              |---> output evs. ~~~
  |                                                 |
  ~~~~~<~~~~~~~~~~~<~~~feedbak loop~~~~~<~~~~~~~~<~~~

由于没有检查Kafka的事实,命令可能会被重播两次,并且似乎输出事件也可能被写入主题两次.

Due to the fact that Kafka is not checkpoined, commands could potentially be replayed twice and it seems that output events could also be written twice the topic.

在这些情况下,如果出现重复的消息,如何恢复状态?聚合可能知道何时其输入流是最新的以开始处理命令?

How could the state be recovered in those cases with repeated messages? Is it possible for the aggregate to know when its input streams are up-to-date to start processing commands?

我想过几种解决方案:

  1. 如果Flink实现了未确认的回滚事件,则可以实现接收器,该接收器将从事件源获取当前偏移量.重新启动后,此接收器将删除kafka主题中比偏移量新的事件.以这种方式,KafkaSource和KafkaSink将由同一构建器生成,然后暴露给拓扑.鉴于其他服务可能会读取该主题中的新事件并导致不一致,因此该解决方案存在一个严重的问题.

  1. If Flink implements a rollback unconfirmed events, a Sink could be implemented which will get the current offset from the event source. When restarted, this sink would remove newer-than-offset events in kafka topic. It his way, KafkaSource and KafkaSink would be generated from the same builder and then exposed to the topology. This solution has a strong problem given that other services could read the newer events in the topic and cause inconsistency.

如果在2中无法从Flink中删除事件,则statefull源可能会从偏移量中读取事件,并尝试匹配聚合中的重复事件并将其删除.此选项似乎不可靠,因为在某些情况下补丁可能无法确定并存在缺陷,因为应该针对每个聚合和拓扑重新考虑它,并且不能保证恢复(例如在连续重新启动的情况下).因此,这是一个糟糕的解决方案.

If removing events from Flink is not possible in 2, the statefull source could potentially read events from the offset and try to match the repeated events in the aggregate and drop them. This options seems not robust as there can be situations where patches are not deterministic and subject to flaws as it should be rethought for each aggregate and topology and it would not warranty recovery (e.g. in case of consecutive restarts). Therefore this is a bad solution.

这是一种不同的方法.它将创建一个带有两个特殊水印的特殊KafkaSource:第一个,KafkaSourceStartedWatermark,将始终在源启动时发送,以通知相关的操作员.发送此水印后,源将在内部记录当前的Kafka偏移量.第二个是KafkaSourceUpToDateWatermark,当到达偏移量时由源发送.这些水印将透明地沿着拓扑传播.操作员应该能够处理这些水印,并实现一个特殊的WatermarkNotifiable接口,然后,聚合将能够缓冲或删除RPC命令,直到每个输入源中的RPC命令都是最新的为止.

A different approach is this one. It is to create a special KafkaSource with two special watermarks: First one, KafkaSourceStartedWatermark, will be always sent at source startup to notify dependant operators. When this watermark is sent, the source internally records the current Kafka offset. Second one, KafkaSourceUpToDateWatermark, is sent by the source when the offset is reached. These watermarks would travel along the topology transparently. The operator should be able to handle these Watermarks, implementing a special WatermarkNotifiable interface.Then, the aggregate will be able to buffer or drop RPC commands until it is up-to-date in every input source.

interface WatermarkNotifiable  {
    void started(String watermarkId);//KafkaSourceStartedWatermark watermark
    void upToDate(String watermarkId);//KafkaSOurceUpToDateWatermark watermark
}  

  • 如果无法在3中实现基础结构,则KafkaSource可以实现一个构造函数,该构造函数指定可以传递给运营商的特殊水印事件,但这将要求所有运营商重新依赖这些水印.然后.

  • If implementing the infrastructure in 3 is not possible, the KafkaSource could implement a constructor specifying a special watermark event that could travel to the operators, but this would require that all the operators depend on these watermarks an re-emits then.

    其他不同的方法是不处理早于标准的命令.例如,命令具有输入时间戳.如果使用时间,则时间同步至关重要.

    Other different approach is to not process commands older that a criteria. For example, commands have an entry timestamp. If time is used, time synchronization is critical.

    StackOverflow相关问题

    1. 使用Kafka作为(CQRS)事件存储.好主意吗?
    2. Kafka-了解消费者是否是最新的
    3. Kafka&重新启动时刷新重复的邮件
    1. Using Kafka as a (CQRS) Eventstore. Good idea?
    2. Kafka - Know if Consumer is up to date
    3. Kafka & Flink duplicate messages on restart

    推荐答案

    创建新的Conmuter运算符类型.这就像一个来源.它由代表事件和命令主题的多个来源组成.它以正在恢复"状态开始.在这种状态下,它将从事件主题中读取最新主题.同时,对于命令,它存储或删除它们.一旦更新,它将考虑已恢复并打开"命令方式.可以将其单独实现为源加上运算符.

    Create a new Conmuter operator type. This is like a Source. It is made of several Sources representing Event and Command topics. It starts in "recovering" state. In this state, it reads from the events topics up to their latest. Meanwhile, for the commands, it stores or drops them. Once up to date, it considers recovered and "opens" the way to commands. It could be implemented separately as a source plus an operator.

    FlinkKafkaProducerXX不足以做到这一点,但它将是实现它的基础.

    FlinkKafkaProducerXX is not enough to do this, but it would be the base to implement it.

    这篇关于使用Kafka作为EventStore时在Flink中恢复状态一致性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆