用查询数据丰富KStream的理想方法 [英] Ideal way to enrich a KStream with lookup data

查看:47
本文介绍了用查询数据丰富KStream的理想方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的流中有一个名为类别"的列,我在不同商店中为每个类别"具有其他静态元数据,它每两天更新一次.什么是进行此查找的正确方法? Kafka流有两个选择

My stream has a column called 'category' and I have additional static metadata for each 'category' in a different store, it gets updated once every couple of days. What is the right way to do this lookup? There are two options with Kafka streams

  1. 在Kafka Streams之外加载静态数据,只需使用KStreams#map()添加元数据.这是可能的,因为Kafka Streams只是一个库.

  1. Load static data outside of Kafka Streams and just use KStreams#map() to add metadata. This is possible as Kafka Streams is just a library.

将元数据加载到Kafka主题,将其加载到KTable并执行KStreams#leftJoin(),这看起来更自然,将分区等留给Kafka Streams.但是,这要求我们保持KTable装载所有值.请注意,我们将必须加载整个查找数据,而不仅是更改.

Load the metadata to a Kafka topic, load it to a KTable and do KStreams#leftJoin(), this seems more natural and leaves partitioning etc to Kafka Streams. However, this requires us to keep the KTable loaded with all the values. Note that we would have to load the entire lookup data, and not just the changes.

  • 例如,假设最初只有一个类别"c1". Kafka Streams应用程序已正常停止,然后再次重新启动.重新启动后,添加了新类别"c2".我的假设是,table = KStreamBuilder().table('metadataTopic')仅具有值'c2',因为这是自应用程序第二次启动以来唯一发生的更改.我希望它具有"c1"和"c2".
  • 如果它也具有"c1",那么是否会从KTable中删除数据(也许通过设置发送键= null消息吗?)?

以上哪种是查找元数据的正确方法?

Which of the above is the right way to lookup metadata?

是否可以始终在重新启动时始终强制只从头开始读取一个流,这样就可以将所有元数据加载到KTable中.

Is it possible to always force just one stream to be read from the beginning on restarts, this is so that all the metadata can be loaded into KTable.

还有其他使用商店的方式吗?

Is there another way using stores?

推荐答案

  1. 在Kafka Streams之外加载静态数据,只需使用KStreams#map()添加元数据.这是可能的,因为Kafka Streams只是一个库.

这有效.但是通常人们会选择您列出的下一个选项,因为用来丰富输入流的辅助数据通常不是完全静态的.相反,它正在变化,但很少出现:

This works. But usually people opt for the next option you listed, because the side data to enrich the input stream with is typically not fully static; rather, it is changing but somewhat infrequently:

  1. 将元数据加载到Kafka主题,将其加载到KTable并执行KStreams#leftJoin(),这看起来更自然,将分区等留给Kafka Streams.但是,这要求我们保持KTable加载所有值.请注意,我们将必须加载整个查找数据,而不仅是更改.

这是通常的方法,除非您有特定原因不建议这样做,否则我建议您坚持使用它.

This is the usual approach, and I'd recommend to stick to it unless you have a specific reason not to.

但是,这要求我们保持KTable装载所有值.请注意,我们将必须加载整个查找数据,而不仅是更改.

However, this requires us to keep the KTable loaded with all the values. Note that we would have to load the entire lookup data, and not just the changes.

所以我想您也喜欢第二种选择,但是您担心这是否有效.

So I guess you also prefer the second option, but you are concerned about whether or not this is efficient.

最简单的答案是:是的,将为KTable加载每个键的所有(最新)值.该表将包含整个查找数据,但请记住,KTable在后台进行了分区:例如,如果您的输入主题(用于该表)具有3分区,那么您最多可以运行3应用程序实例,每个实例都获得表的1分区(假设数据均匀地分布在各个分区上,那么表的每个分区/共享的空间将约占表数据的1/3).因此,实际上,它更有可能行之有效".我在下面分享了更多详细信息.

Short answer is: Yes, the KTable will be loaded with all the (latest) values per key. The table will contain the entire lookup data, but keep in mind that the KTable is partitioned behind the scenes: if, for example, your input topic (for the table) has 3 partitions, then you can run up to 3 instances of your application, each of which getting 1 partition of the table (assuming data is spread evenly across partitions, then each partition/shared of the table would hold about 1/3 of the table's data). So in practice more likely than not it "just works". I share more details below.

全局KTables :或者,您也可以使用全局KTables 而不是(分区的)普通表变体.使用全局表,您的应用程序的每个实例都具有表数据的完整副本.这使得全局表对于连接方案非常有用,包括根据您的问题来丰富KStream.

Global KTables: Alternatively, you can use global KTables instead of the (paritioned) normal table variant. With global tables every instance of your application has a full copy of the table data. This makes global tables very useful for join scenarios, including for enriching a KStream as per your question.

是否可以始终在重新启动时始终强制只从头开始读取一个流,这样就可以将所有元数据加载到KTable中.

Is it possible to always force just one stream to be read from the beginning on restarts, this is so that all the metadata can be loaded into KTable.

您不必为此担心.简而言之,如果表的本地副本"不可用,则Streams API将自动确保从头开始完全读取表的数据.如果存在本地副本,则您的应用程序将重新使用该副本(并在表的输入主题中提供新数据时更新其本地副本).

You don't need to worry about that. Simply put, if there is no local "copy" of the table available, then the Streams API would automatically ensure that the table's data is read fully from scratch. If there is a local copy available, then your application will re-use that copy (and update its local copy whenever new data is available in the table's input topic).

更多示例答案

想象一下您的KTable的以下输入数据(例如:changelog流),请注意此输入如何由6消息组成:

Imagine the following input data (think: changelog stream) for your KTable, note how this input consists of 6 messages:

(alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22)

这是此输入将导致的逻辑" KTable的各种状态,其中每个新接收到的输入消息(例如(alice, 1))将导致表的新状态:

And here's the various states of the "logical" KTable that would result from this input is, where each newly received input message (such as (alice, 1)) would result in a new state of the table:

Key      Value
--------------
alice   |   1    // (alice, 1) received

 |
 V

Key      Value
--------------
alice   |   1
bob     |  40    // (bob, 40) received

 |
 V

Key      Value
--------------
alice   |   2    // (alice, 2) received
bob     |  40

 |
 V

Key      Value
--------------
alice   |   2
bob     |  40
charlie | 600    // (charlie, 600) received

 |
 V

Key      Value
--------------
alice   |   5    // (alice, 5) received
bob     |  40
charlie | 600

 |
 V

Key      Value
--------------
alice   |   5
bob     |  22    // (bob, 22) received
charlie | 600

在这里您可以看到的是,即使输入数据可能有很多很多消息(或如您所说的更改";在这里,我们有6),结果中的条目/行数KTable(正在根据新接收到的输入进行连续突变)是输入中唯一键的数量(此处:从1开始,逐渐增加到3),通常比该数量少得多的消息.因此,如果输入中的消息数为N并且这些消息的唯一键数为M,则通常为M << N(M显着小于N;此外,对于记录,我们有不变的M <= N).

What you can see here is that, even though the input data may have many, many messages (or "changes" as you said; here, we have 6), the number of entries/rows in the resulting KTable (which is undergoing continuous mutations based on the newly received input) is the number of unique keys in the input (here: starting out with 1, ramping up to 3), which typically is significantly less than the number of messages. So, if the number of messages in the input is N and the number of unique keys for these messages is M, then typically M << N (M is significantly smaller than N; plus, for the record, we have the invariant M <= N).

这是这要求我们保持KTable加载所有值"的第一个原因通常不是问题,因为每个键仅保留最新值.

This is the first reason why "this requires us to keep the KTable loaded with all the values" is typically not an issue, because only the latest value is retained per key.

帮助的第二个原因是,正如Matthias J. Sax所指出的那样,Kafka Streams使用RocksDB作为此类表的默认存储引擎(更确切地说:状态存储了返回表). RocksDB允许您维护的表要大于应用程序的可用主内存/Java堆空间,因为它可能会溢出到本地磁盘.

The second reason that helps is that, as Matthias J. Sax has pointed out, Kafka Streams uses RocksDB as the default storage engine for such tables (more precisely: the state stores that back a table). RocksDB allows you to maintain tables that are larger than the available main memory / Java heap space of your application because it can spill to local disk.

最后,第三个原因是KTable已分区.因此,如果(例如)为表的输入主题配置了3分区,那么幕后发生的事情是KTable本身以相同的方式进行了分区(认为:分片).在上面的示例中,这是最终的结果,尽管确切的拆分"取决于原始输入数据在表的输入主题的分区之间的分布方式:

Lastly, the third reason is that a KTable is partitioned. So, if your input topic for the table is (say) configured with 3 partitions, then what's happening behind the scenes is that the KTable itself is partitioned (think: sharded) in the same way. In the example above, here's what you could end up with, though the exact "splits" depend on the how the original input data is spread across the partitions of the table's input topic:

逻辑KTable(上面显示的最后状态):

Logical KTable (last state of what I showed above):

Key      Value
--------------
alice   |   5
bob     |  22
charlie | 600

实际的KTable,已分区(假设该表的输入主题为3个分区,再加上键=在各个分区之间均匀分布的用户名):

Actual KTable, partitioned (assuming 3 partitions for the table's input topic, plus keys=usernames being spread evenly across partitions):

Key      Value
--------------
alice   |   5    // Assuming that all data for `alice` is in partition 1

Key      Value
--------------
bob     |  22    // ...for `bob` is in partition 2

Key      Value
--------------
charlie | 600    // ...for `charlie` is in partition 3

实际上,输入数据的这种划分(除其他外)使您可以调整" KTable的实际表现.

In practice, this partitioning of the input data -- among other things -- allows you to "size" the actual manifestations of a KTable.

另一个例子:

  • 想象一下,KTable的最新状态通常具有1 TB的大小(同样,近似大小是表输入数据中唯一消息键的数量乘以相关消息值的平均大小的函数. ).
  • 如果表的输入主题仅具有1分区,则KTable本身也仅具有1分区,大小为1 TB.在这里,由于输入主题只有1分区,因此您可以在最多1个应用程序实例上运行您的应用程序(所以实际上并没有很多并行性,呵呵).
  • 如果表的输入主题具有500分区,则KTable也具有500分区,每个分区的大小约为2 GB(假设数据均匀分布在各个分区上).在这里,您最多可以使用500个应用程序实例来运行您的应用程序.如果要精确地运行500实例,则每个应用程序实例都将精确地获得逻辑KTable的1分区/分片,从而最终获得2 GB的表数据.如果仅运行100个实例,则每个实例将获得表的500 / 100 = 5个分区/分片,最后以大约2 GB * 5 = 10 GB个表​​数据结束.
  • Imagine the latest state of your KTable would typically have a size of 1 TB (again, the approximate size is a function of the number of unique message keys in the table's input data, multiplied by the average size of the associated message value).
  • If the table's input topic has only a 1 partition, then the KTable itself also has only 1 partition, with a size of 1 TB. Here, because the input topic has but 1 partition, you could run your application with up to 1 app instances (so not really a whole lot of parallelism, heh).
  • If the table's input topic has 500 partitions, then the KTable has 500 partitions, too, with a size of ~ 2 GB each (assuming data is evenly spread across the partitions). Here, you could run your application with up to 500 app instances. If you were to run exactly 500 instances, then each app instance would get exactly 1 partition/shard of the logical KTable, thus ending up with 2 GB of table data; if you were to run only 100 instances, then each instance would get 500 / 100 = 5 partitions/shards of the table, ending up with about 2 GB * 5 = 10 GB of table data.

这篇关于用查询数据丰富KStream的理想方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆