Kafka流用于添加全局存储的用例 [英] Kafka streams use cases for add global store

查看:51
本文介绍了Kafka流用于添加全局存储的用例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在kafka流中定义拓扑时,可以添加全局状态存储.它需要一个源主题以及一个ProcessorSupplier. 处理器接收记录,并可以在理论上对其进行转换,然后再将其添加到存储中.但是,在进行恢复的情况下,记录将直接从源主题(更改日志)插入到全局状态存储中,从而跳过了最终在处理器中完成的转换.

When defining a topology in kafka streams, a global state store can be added. It will need a source topic as well as a ProcessorSupplier. The processor receive records and could theorically transform them before adding them to the store. But in case of restoration, the records are directly inserted from the source topic (changelog) into the global state store, skipping eventual transformation done in the processor.

   +-------------+             +-------------+              +---------------+
   |             |             |             |              |    global     |
   |source topic  ------------->  processor  +-------------->    state      |
   |(changelog)  |             |             |              |    store      |
   +-------------+             +-------------+              +---------------+
          |                                                         ^
          |                                                         |
          +---------------------------------------------------------+
              record directly inserted during restoration

StreamsBuilder#addGlobalStore(StoreBuilder storeBuilder, String topic, Consumed consumed, ProcessorSupplier stateUpdateSupplier) Adds a global StateStore to the topology.

根据文档

注意:您不应使用处理器将已转换的记录插入全局状态存储.该存储使用源主题作为变更日志,并且在还原过程中将直接直接从源中插入记录.应该使用此ProcessorNode来保持StateStore的最新状态.

NOTE: you should not use the Processor to insert transformed records into the global state store. This store uses the source topic as changelog and during restore will insert records directly from the source. This ProcessorNode should be used to keep the StateStore up-to-date.

同时,由于主要错误已在kafka错误跟踪器上打开: KAFKA-7663 从主题还原状态时,不会使用addGlobalStore上提供的自定义处理器,该主题确切解释了文档中所述的内容,但似乎是一个可接受的错误.

In parallel as major bug is currently open on the kafka bug tracker : KAFKA-7663 Custom Processor supplied on addGlobalStore is not used when restoring state from topic which explains exactly what is stated in the documentation, but seems to be an accepted bug.

我想知道KAFKA-7663是否确实是一个错误.根据文档,它似乎是这样设计的,在这种情况下,我很难理解用例.
有人可以解释这个低级API的主要用例吗?我唯一能想到的就是处理副作用,例如在处理器中执行一些日志操作.

I am wondering if KAFKA-7663 is indeed a bug or not. According to the documentation, it seems to have been designed like this, in which case I struggle to understand the use case.
Can someone explain the major use cases of this low level API? Only thing I can think of is to process side effects, like for example, doing some log operations in the processor.

奖励问题:如果源主题充当全局存储的变更日志,则由于保留已过期而从主题中删除记录时,是否会将其从全局状态存储中删除?还是仅在从更改日志中对整个商店进行还原之后,才会在商店中进行删除.

Bonus question: If source topic acts as the changelog of the global store, when a record is deleted from the topic because the retention has expired, will it be removed from the global state store? Or does the removal will only take place in the store after a full store restoration from changelog.

推荐答案

是的,这是一个很奇怪的小问题22,但是文档是正确的.全局状态存储的处理器不得对记录做任何事情,而应将其持久存储到存储中.

Yeah, this is quite a weird little catch-22, but the documentation is correct. The Processor for a global state store must not do anything to the records but persist them into the store.

AFAIK,这不是一个哲学问题,只是一个实际问题.原因很简单,就是您观察到的行为... Streams将输入主题视为商店的changelog主题,因此在还原过程中会绕过处理器(以及反序列化).

AFAIK, this isn't a philosophical issue, just a practical one. The reason is simply the behavior you observe... Streams treats the input topic as a changelog topic for the store and therefore bypasses the processor (as well as deserialization) during restoration.

状态恢复绕过任何处理的原因是通常更改日志中的数据与存储中的数据相同,因此对它进行任何新操作实际上都是错误的.另外,仅将字节从线中删除并将它们批量写入状态存储区就更有效.我之所以说通常",是因为在这种情况下,输入主题与普通的changelog主题并不完全一样,因为在存储放置期间它不会接收其写入.

The reason that state restoration bypasses any processing is that usually the data in a changelog is identical to the data in the store, so it would actually be wrong to do anything new to it. Plus, it's more efficient just to take the bytes off the wire and bulk-write them into the state stores. I say "usually" because in this case, the input topic isn't exactly like a normal changelog topic, in that it doesn't receive its writes during store puts.

对于它的价值,我也很难理解用例.看来,我们应该:

For what it's worth, I also struggle to understand the use case. Seemingly, we should either:

  1. 完全摆脱该处理器,就像恢复一样,总是将二进制数据从网络中转储到存储中.
  2. 重新设计全局存储,以允许在全局存储之前进行任意转换.我们可以:
    • 在还原过程中也继续使用输入主题并反序列化并调用处理器,或者
    • 为全局商店添加 real 更改日志,以便我们轮询输入主题,应用一些转换,然后写入全局商店 -变更日志.然后,我们可以使用变更日志(而不是输入)进行还原和复制.
  1. Get rid of that processor entirely, and always just dump the binary data off the wire into the stores, just like restoration does.
  2. Re-design global stores to allow arbitrary transformations before the global store. We could either:
    • continue to use the input topic and deserialize and invoke the processors during restoration as well, OR
    • add a real changelog for global stores, such that we'd poll the input topic, apply some transformations, then write to the global store and the global-store-changelog. Then, we can use the changelog (not the input) for restoration and replication.

顺便说一句,如果您想要后者的行为,则可以立即应用转换,然后使用to(my-global-changelog)来制造一个"changelog"主题,以近似其行为.然后,您将创建全局存储以读取您的my-global-changelog而不是输入.

By the way, if you want the latter behavior, you can approximate it right now by applying your transformations and then using to(my-global-changelog) to manufacture a "changelog" topic. Then, you'd create the global store to read from your my-global-changelog instead of the input.

因此,给您一个直接的答案,KAFKA-7663不是错误.我将对建议将票证转换为功能请求的票证进行评论.

So, to give you a direct answer, KAFKA-7663 is not a bug. I'll comment on the ticket proposing to turn it into a feature request.

奖励答案:用作状态存储的变更日志的主题绝不能进行保留配置.实际上,这意味着您应该通过启用压缩来防止无限增长,并禁用日志保留.

Bonus answer: Topics that act as changelogs for state stores must not be configured with retention. Practically speaking, this means you should prevent infinite growth by enabling compaction, and disable log retention.

在实践中,旧数据丢失并丢失不是一个事件",消费者无法知道是否/何时发生.因此,无法响应此非事件而从状态存储中删除数据.就像您描述的那样,它将发生……记录将无限期地存在于全局存储中.如果/当替换实例时,新实例将从输入中恢复,并且(显然)仅接收当时该主题中存在的记录.因此,Streams集群作为一个整体会在全局状态不一致的情况下结束.这就是为什么您应该禁用保留功能.

In practice, old data falling out of retention and getting dropped is not an "event", and consumers have no way of knowing if/when it happens. Therefore, it's not possible to remove data from the state stores in response to this non-event. It would happen as you describe... the records would just sit there in the global store indefinitely. If/when an instance is replaced, the new one would restore from the input and (obviously) only receive records that exist in the topic at that time. Thus, the Streams cluster as a whole would wind up with an inconsistent view of the global state. That's why you should disable retention.

从存储中删除"旧数据的正确方法是将所需键的逻辑删除写入输入主题.然后,它将正确传播到群集的所有成员,在还原过程中正确应用,并由代理正确压缩.

The right way to "drop" old data from the store would be to just write a tombstone for the desired key into the input topic. This would then be correctly propagated to all members of the cluster, applied correctly during restoration, AND correctly compacted by the brokers.

我希望这对您有所帮助.肯定的,请输入票证,并帮助我们使API更加直观!

I hope this all helps. Definitely, please chime in on the ticket and help us shape the API to be more intuitive!

这篇关于Kafka流用于添加全局存储的用例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆