使用查找数据丰富 KStream 的理想方式 [英] Ideal way to enrich a KStream with lookup data

查看:29
本文介绍了使用查找数据丰富 KStream 的理想方式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的流有一个名为类别"的列,我在不同商店中为每个类别"提供了额外的静态元数据,它每隔几天更新一次.进行此查找的正确方法是什么?Kafka 流有两个选项

  1. 在 Kafka Streams 之外加载静态数据,只需使用 KStreams#map() 添加元数据.这是可能的,因为 Kafka Streams 只是一个库.

  2. 将元数据加载到 Kafka 主题,将其加载到 KTable 并执行 KStreams#leftJoin(),这看起来更自然,并且将分区等留给卡夫卡流.但是,这需要我们保持 KTable 加载所有值.请注意,我们必须加载整个查找数据,而不仅仅是更改.

    • 例如,假设最初只有一个类别c1".Kafka 流应用程序已正常停止,然后再次重新启动.重新启动后,添加了一个新类别c2".我的假设是,table = KStreamBuilder().table('metadataTopic') 将只具有值 'c2',因为这是自应用程序第二次启动以来唯一改变的东西.我希望它有c1"和c2".
    • 如果它也有 'c1',是否会从 KTable 中删除数据(也许通过设置发送 key = null 消息?)?

以上哪个是查找元数据的正确方法?

是否可以在重新启动时始终强制从头开始读取一个流,以便所有元数据都可以加载到 KTable 中.

还有其他使用商店的方法吗?

解决方案

  1. 在 Kafka Streams 之外加载静态数据,只需使用 KStreams#map() 添加元数据.这是可能的,因为 Kafka Streams 只是一个库.

这有效.但是通常人们会选择您列出的下一个选项,因为用于丰富输入流的辅助数据通常不是完全静态的;相反,它正在发生变化,但很少发生:

<块引用>

  1. 将元数据加载到 Kafka 主题,将其加载到 KTable 并执行 KStreams#leftJoin(),这看起来更自然,并将分区等留给 Kafka Streams.但是,这需要我们保持 KTable 加载所有值.请注意,我们必须加载整个查找数据,而不仅仅是更改.

这是通常的方法,我建议您坚持下去,除非您有特定的理由不这样做.

<块引用>

然而,这需要我们保持 KTable 加载所有值.请注意,我们必须加载整个查找数据,而不仅仅是更改.

所以我猜你也更喜欢第二个选项,但你担心这是否有效.

简短的回答是:是的,KTable 将加载每个键的所有(最新)值.该表将包含整个查找数据,但请记住,KTable 是在幕后分区的:例如,如果您的输入主题(对于该表)具有 3 个分区,那么您可以运行最多 3 个应用程序实例,每个实例获取表的 1 个分区(假设数据均匀分布在多个分区之间,那么表的每个分区/共享将保存大约表数据的 1/3).所以在实践中它更有可能正常工作".我在下面分享更多细节.

全局 KTables: 或者,您可以使用 全局 KTables 而不是(分区的)普通表变体.对于全局表,应用程序的每个实例都有表数据的完整副本.这使得全局表对于连接场景非常有用,包括根据您的问题丰富 KStream.

<块引用>

是否可以在重新启动时始终强制从头开始读取一个流,以便所有元数据都可以加载到 KTable 中.

你不必担心这个.简单地说,如果没有可用的表的本地副本",那么 Streams API 将自动确保从头开始完全读取表的数据.如果有可用的本地副本,那么您的应用程序将重用该副本(并在表的输入主题中有新数据可用时更新其本地副本).

用例子给出更长的答案

想象一下 KTable 的以下输入数据(想想:changelog 流),注意这个输入是如何由 6 消息组成的:

(alice, 1) ->(鲍勃, 40) ->(爱丽丝,2)->(查理, 600), (爱丽丝, 5), (鲍勃, 22)

这里是由这个输入产生的逻辑"KTable 的各种状态,每个新收到的输入消息(例如 (alice, 1)) 将导致表的新状态:

键值--------------爱丽丝 |1//(alice, 1) 收到|伏核心价值--------------爱丽丝 |1鲍勃 |40//(bob, 40) 收到|伏核心价值--------------爱丽丝 |2//(alice, 2) 收到鲍勃 |40|伏核心价值--------------爱丽丝 |2鲍勃 |40查理|600//(charlie, 600) 收到|伏核心价值--------------爱丽丝 |5//(alice, 5) 收到鲍勃 |40查理|600|伏核心价值--------------爱丽丝 |5鲍勃 |22//(bob, 22) 收到查理|600

您在这里可以看到的是,即使输入数据可能有很多很多消息(或如您所说的更改";这里,我们有 6),但条目数/结果 KTable 中的行(根据新收到的输入不断变化)是输入中唯一键的数量(这里:从 1 开始,斜坡最多 3),这通常远小于消息的数量.因此,如果输入中的消息数为 N 并且这些消息的唯一键数为 M,则通常 M <<<;N(M 明显小于 N;另外,为了记录,我们有不变的 M <= N).

这是为什么这要求我们保持 KTable 加载所有值"通常不是问题的第一个原因,因为每个键只保留最新值.

第二个有帮助的原因是,正如 Matthias J. Sax 所指出的,Kafka Streams 使用 RocksDB 作为此类表的默认存储引擎(更准确地说:支持表的状态存储).RocksDB 允许您维护大于应用程序可用主内存/Java 堆空间的表,因为它可能会溢出到本地磁盘.

最后,第三个原因是 KTable 被分区.因此,如果您的表的输入主题(例如)配置了 3 分区,那么幕后发生的事情是 KTable 本身是分区的(想想:分片)以同样的方式.在上面的示例中,您可能会得到以下结果,尽管确切的拆分"取决于原始输入数据在表输入主题的分区中的分布方式:

逻辑 KTable(我上面展示的最后一个状态):

键值--------------爱丽丝 |5鲍勃 |22查理|600

实际KTable,分区(假设表的输入主题有3 个分区,加上keys=usernames 在分区之间均匀分布):

键值--------------爱丽丝 |5//假设 `alice` 的所有数据都在分区 1 中核心价值--------------鲍勃 |22//...因为`bob`在分区2核心价值--------------查理|600//...因为`charlie`在分区3中

在实践中,输入数据的这种分区——除其他外——允许您调整"KTable 的实际表现形式.

另一个例子:

  • 想象一下,您的 KTable 的最新状态通常有 1 TB 的大小(同样,近似大小是表输入数据中唯一消息键的数量乘以相关消息值的平均大小的函数).
  • 如果表的输入topic只有一个1分区,那么KTable本身也只有1个分区,大小为1TB.在这里,因为输入主题只有 1 个分区,所以您可以使用最多 1 个应用程序实例来运行您的应用程序(所以实际上并不是很多并行性,呵呵).
  • 如果表的输入主题有 500 个分区,那么 KTable 也有 500 个分区,每个分区的大小约为 2 GB(假设数据均匀分布在分区).在这里,您最多可以使用 500 个应用程序实例来运行您的应用程序.如果您要运行恰好 500 个实例,那么每个应用程序实例将获得逻辑 KTable 的 1 个分区/分片,从而最终得到 2 GB 的表数据;如果您只运行 100 个实例,那么每个实例将获得 500/100 = 5 个表的分区/分片,最终得到大约 2 GB *5 = 10 GB 表格数据.

My stream has a column called 'category' and I have additional static metadata for each 'category' in a different store, it gets updated once every couple of days. What is the right way to do this lookup? There are two options with Kafka streams

  1. Load static data outside of Kafka Streams and just use KStreams#map() to add metadata. This is possible as Kafka Streams is just a library.

  2. Load the metadata to a Kafka topic, load it to a KTable and do KStreams#leftJoin(), this seems more natural and leaves partitioning etc to Kafka Streams. However, this requires us to keep the KTable loaded with all the values. Note that we would have to load the entire lookup data, and not just the changes.

    • For example, say initially there was just one category 'c1'. Kafka streams app was stopped gracefully, and restarted again. After the restart, a new category 'c2' was added. My assumption is that, table = KStreamBuilder().table('metadataTopic') would just have the value 'c2', as that was the only thing that changed since the app started for second time. I would want it to have 'c1' and 'c2'.
    • If it does have 'c1' as well, would the data ever be removed from KTable (perhaps by setting sending key = null message ? ) ?

Which of the above is the right way to lookup metadata?

Is it possible to always force just one stream to be read from the beginning on restarts, this is so that all the metadata can be loaded into KTable.

Is there another way using stores?

解决方案

  1. Load static data outside of Kafka Streams and just use KStreams#map() to add metadata. This is possible as Kafka Streams is just a library.

This works. But usually people opt for the next option you listed, because the side data to enrich the input stream with is typically not fully static; rather, it is changing but somewhat infrequently:

  1. Load the metadata to a Kafka topic, load it to a KTable and do KStreams#leftJoin(), this seems more natural and leaves partitioning etc to Kafka Streams. However, this requires us to keep the KTable loaded with all the values. Note that we would have to load the entire lookup data, and not just the changes.

This is the usual approach, and I'd recommend to stick to it unless you have a specific reason not to.

However, this requires us to keep the KTable loaded with all the values. Note that we would have to load the entire lookup data, and not just the changes.

So I guess you also prefer the second option, but you are concerned about whether or not this is efficient.

Short answer is: Yes, the KTable will be loaded with all the (latest) values per key. The table will contain the entire lookup data, but keep in mind that the KTable is partitioned behind the scenes: if, for example, your input topic (for the table) has 3 partitions, then you can run up to 3 instances of your application, each of which getting 1 partition of the table (assuming data is spread evenly across partitions, then each partition/shared of the table would hold about 1/3 of the table's data). So in practice more likely than not it "just works". I share more details below.

Global KTables: Alternatively, you can use global KTables instead of the (paritioned) normal table variant. With global tables every instance of your application has a full copy of the table data. This makes global tables very useful for join scenarios, including for enriching a KStream as per your question.

Is it possible to always force just one stream to be read from the beginning on restarts, this is so that all the metadata can be loaded into KTable.

You don't need to worry about that. Simply put, if there is no local "copy" of the table available, then the Streams API would automatically ensure that the table's data is read fully from scratch. If there is a local copy available, then your application will re-use that copy (and update its local copy whenever new data is available in the table's input topic).

Longer answer with examples

Imagine the following input data (think: changelog stream) for your KTable, note how this input consists of 6 messages:

(alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22)

And here's the various states of the "logical" KTable that would result from this input is, where each newly received input message (such as (alice, 1)) would result in a new state of the table:

Key      Value
--------------
alice   |   1    // (alice, 1) received

 |
 V

Key      Value
--------------
alice   |   1
bob     |  40    // (bob, 40) received

 |
 V

Key      Value
--------------
alice   |   2    // (alice, 2) received
bob     |  40

 |
 V

Key      Value
--------------
alice   |   2
bob     |  40
charlie | 600    // (charlie, 600) received

 |
 V

Key      Value
--------------
alice   |   5    // (alice, 5) received
bob     |  40
charlie | 600

 |
 V

Key      Value
--------------
alice   |   5
bob     |  22    // (bob, 22) received
charlie | 600

What you can see here is that, even though the input data may have many, many messages (or "changes" as you said; here, we have 6), the number of entries/rows in the resulting KTable (which is undergoing continuous mutations based on the newly received input) is the number of unique keys in the input (here: starting out with 1, ramping up to 3), which typically is significantly less than the number of messages. So, if the number of messages in the input is N and the number of unique keys for these messages is M, then typically M << N (M is significantly smaller than N; plus, for the record, we have the invariant M <= N).

This is the first reason why "this requires us to keep the KTable loaded with all the values" is typically not an issue, because only the latest value is retained per key.

The second reason that helps is that, as Matthias J. Sax has pointed out, Kafka Streams uses RocksDB as the default storage engine for such tables (more precisely: the state stores that back a table). RocksDB allows you to maintain tables that are larger than the available main memory / Java heap space of your application because it can spill to local disk.

Lastly, the third reason is that a KTable is partitioned. So, if your input topic for the table is (say) configured with 3 partitions, then what's happening behind the scenes is that the KTable itself is partitioned (think: sharded) in the same way. In the example above, here's what you could end up with, though the exact "splits" depend on the how the original input data is spread across the partitions of the table's input topic:

Logical KTable (last state of what I showed above):

Key      Value
--------------
alice   |   5
bob     |  22
charlie | 600

Actual KTable, partitioned (assuming 3 partitions for the table's input topic, plus keys=usernames being spread evenly across partitions):

Key      Value
--------------
alice   |   5    // Assuming that all data for `alice` is in partition 1

Key      Value
--------------
bob     |  22    // ...for `bob` is in partition 2

Key      Value
--------------
charlie | 600    // ...for `charlie` is in partition 3

In practice, this partitioning of the input data -- among other things -- allows you to "size" the actual manifestations of a KTable.

Another example:

  • Imagine the latest state of your KTable would typically have a size of 1 TB (again, the approximate size is a function of the number of unique message keys in the table's input data, multiplied by the average size of the associated message value).
  • If the table's input topic has only a 1 partition, then the KTable itself also has only 1 partition, with a size of 1 TB. Here, because the input topic has but 1 partition, you could run your application with up to 1 app instances (so not really a whole lot of parallelism, heh).
  • If the table's input topic has 500 partitions, then the KTable has 500 partitions, too, with a size of ~ 2 GB each (assuming data is evenly spread across the partitions). Here, you could run your application with up to 500 app instances. If you were to run exactly 500 instances, then each app instance would get exactly 1 partition/shard of the logical KTable, thus ending up with 2 GB of table data; if you were to run only 100 instances, then each instance would get 500 / 100 = 5 partitions/shards of the table, ending up with about 2 GB * 5 = 10 GB of table data.

这篇关于使用查找数据丰富 KStream 的理想方式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆