用于添加全局存储的 Kafka 流用例 [英] Kafka streams use cases for add global store

查看:20
本文介绍了用于添加全局存储的 Kafka 流用例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在kafka流中定义拓扑时,可以添加全局状态存储.它将需要一个源主题以及一个 ProcessorSupplier.处理器接收记录并且可以在将它们添加到存储之前理论上转换它们.但是在恢复的情况下,记录直接从源主题(变更日志)插入到全局状态存储中,跳过处理器中完成的最终转换.

When defining a topology in kafka streams, a global state store can be added. It will need a source topic as well as a ProcessorSupplier. The processor receive records and could theorically transform them before adding them to the store. But in case of restoration, the records are directly inserted from the source topic (changelog) into the global state store, skipping eventual transformation done in the processor.

   +-------------+             +-------------+              +---------------+
   |             |             |             |              |    global     |
   |source topic  ------------->  processor  +-------------->    state      |
   |(changelog)  |             |             |              |    store      |
   +-------------+             +-------------+              +---------------+
          |                                                         ^
          |                                                         |
          +---------------------------------------------------------+
              record directly inserted during restoration

StreamsBuilder#addGlobalStore(StoreBuilder storeBuilder, String topic, Consumed used, ProcessorSupplier stateUpdateSupplier) 将全局 StateStore 添加到拓扑中.

StreamsBuilder#addGlobalStore(StoreBuilder storeBuilder, String topic, Consumed consumed, ProcessorSupplier stateUpdateSupplier) Adds a global StateStore to the topology.

根据文档

注意:您不应使用处理器来将转换后的记录插入全局状态存储.此存储使用源主题作为更改日志,并且在恢复期间将直接从源插入记录.此 ProcessorNode 应用于使 StateStore 保持最新状态.

NOTE: you should not use the Processor to insert transformed records into the global state store. This store uses the source topic as changelog and during restore will insert records directly from the source. This ProcessorNode should be used to keep the StateStore up-to-date.

同时,由于 kafka 错误跟踪器上的主要错误当前已打开:KAFKA-7663从主题恢复状态时不使用 addGlobalStore 上提供的自定义处理器,这准确地解释了文档中所述的内容,但似乎是一个可接受的错误.

In parallel as major bug is currently open on the kafka bug tracker : KAFKA-7663 Custom Processor supplied on addGlobalStore is not used when restoring state from topic which explains exactly what is stated in the documentation, but seems to be an accepted bug.

我想知道 KAFKA-7663 是否确实是一个错误.根据文档,它似乎是这样设计的,在这种情况下,我很难理解用例.
有人可以解释这个低级 API 的主要用例吗?我唯一能想到的就是处理副作用,比如在处理器中做一些日志操作.

I am wondering if KAFKA-7663 is indeed a bug or not. According to the documentation, it seems to have been designed like this, in which case I struggle to understand the use case.
Can someone explain the major use cases of this low level API? Only thing I can think of is to process side effects, like for example, doing some log operations in the processor.

额外问题:如果源主题作为全局存储的更改日志,当由于保留期已过期而从主题中删除记录时,它会从全局状态存储中删除吗?或者删除是否只会在从变更日志中完全恢复商店后在商店中进行.

Bonus question: If source topic acts as the changelog of the global store, when a record is deleted from the topic because the retention has expired, will it be removed from the global state store? Or does the removal will only take place in the store after a full store restoration from changelog.

推荐答案

是的,这是一个很奇怪的小 catch-22,但是文档是正确的.全局状态存储的处理器不得对记录执行任何操作,而是将它们保存到存储中.

Yeah, this is quite a weird little catch-22, but the documentation is correct. The Processor for a global state store must not do anything to the records but persist them into the store.

AFAIK,这不是一个哲学问题,只是一个实际问题.原因只是您观察到的行为...... Streams 将输入主题视为存储的变更日志主题,因此在恢复期间绕过处理器(以及反序列化).

AFAIK, this isn't a philosophical issue, just a practical one. The reason is simply the behavior you observe... Streams treats the input topic as a changelog topic for the store and therefore bypasses the processor (as well as deserialization) during restoration.

状态恢复绕过任何处理的原因是通常更改日志中的数据与存储中的数据相同,因此对其进行任何新操作实际上都是错误的.另外,将字节从网络中取出并将它们批量写入状态存储中会更有效.我说通常"是因为在这种情况下,输入主题与普通的更改日志主题并不完全一样,因为它在存储放置期间不会收到其写入.

The reason that state restoration bypasses any processing is that usually the data in a changelog is identical to the data in the store, so it would actually be wrong to do anything new to it. Plus, it's more efficient just to take the bytes off the wire and bulk-write them into the state stores. I say "usually" because in this case, the input topic isn't exactly like a normal changelog topic, in that it doesn't receive its writes during store puts.

就其价值而言,我也很难理解用例.看起来,我们应该:

For what it's worth, I also struggle to understand the use case. Seemingly, we should either:

  1. 完全摆脱该处理器,并始终将二进制数据从网络中转储到存储中,就像恢复一样.
  2. 重新设计全局存储以允许在全局存储之前进行任意转换.我们可以:
    • 在恢复期间继续使用输入主题并反序列化和调用处理器,或者
    • 为全局存储添加真实变更日志,以便我们轮询输入主题,应用一些转换,然后写入全局存储全局存储-变更日志.然后,我们可以使用更改日志(而不是输入)进行恢复和复制.
  1. Get rid of that processor entirely, and always just dump the binary data off the wire into the stores, just like restoration does.
  2. Re-design global stores to allow arbitrary transformations before the global store. We could either:
    • continue to use the input topic and deserialize and invoke the processors during restoration as well, OR
    • add a real changelog for global stores, such that we'd poll the input topic, apply some transformations, then write to the global store and the global-store-changelog. Then, we can use the changelog (not the input) for restoration and replication.

顺便说一下,如果你想要后一种行为,你现在可以通过应用你的转换来近似它,然后使用 to(my-global-changelog) 来制造一个变更日志"主题.然后,您将创建全局存储以从您的 my-global-changelog 而不是输入中读取.

By the way, if you want the latter behavior, you can approximate it right now by applying your transformations and then using to(my-global-changelog) to manufacture a "changelog" topic. Then, you'd create the global store to read from your my-global-changelog instead of the input.

所以,给你一个直接的答案,KAFKA-7663 不是一个错误.我将评论提议将其转换为功能请求的票证.

So, to give you a direct answer, KAFKA-7663 is not a bug. I'll comment on the ticket proposing to turn it into a feature request.

奖励答案:作为状态存储更改日志的主题不得配置为保留.实际上,这意味着您应该通过启用压缩和禁用日志保留来防止无限增长.

Bonus answer: Topics that act as changelogs for state stores must not be configured with retention. Practically speaking, this means you should prevent infinite growth by enabling compaction, and disable log retention.

在实践中,旧数据脱离保留期和被删除并不是一个事件",消费者无法知道它是否/何时发生.因此,不可能从状态存储中删除数据以响应此非事件.它会如您所描述的那样发生……这些记录将无限期地留在全球存储中.如果/当一个实例被替换时,新的实例将从输入中恢复并且(显然)只接收当时存在于主题中的记录.因此,整个 Streams 集群最终会以不一致的全局状态视图结束.这就是您应该禁用保留的原因.

In practice, old data falling out of retention and getting dropped is not an "event", and consumers have no way of knowing if/when it happens. Therefore, it's not possible to remove data from the state stores in response to this non-event. It would happen as you describe... the records would just sit there in the global store indefinitely. If/when an instance is replaced, the new one would restore from the input and (obviously) only receive records that exist in the topic at that time. Thus, the Streams cluster as a whole would wind up with an inconsistent view of the global state. That's why you should disable retention.

从存储中删除"旧数据的正确方法是将所需键的墓碑写入输入主题.然后,这将正确传播到集群的所有成员,在恢复期间正确应用,并由代理正确压缩.

The right way to "drop" old data from the store would be to just write a tombstone for the desired key into the input topic. This would then be correctly propagated to all members of the cluster, applied correctly during restoration, AND correctly compacted by the brokers.

我希望这一切都会有所帮助.当然,请在工单上加入并帮助我们塑造更直观的 API!

I hope this all helps. Definitely, please chime in on the ticket and help us shape the API to be more intuitive!

这篇关于用于添加全局存储的 Kafka 流用例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆