Kafka Stream:KTable实现 [英] Kafka Stream: KTable materialization

查看:465
本文介绍了Kafka Stream:KTable实现的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何确定到主题的KTable实现何时完成?

How to identify when the KTable materialization to a topic has completed?

例如假设KTable有几百万行.伪代码如下:

For e.g. assume KTable has few million rows. Pseudo code below:

KTable<String, String> kt = kgroupedStream.groupByKey(..).reduce(..); //Assume this produces few million rows

在某个时间点,我想安排一个线程来调用以下内容,该内容写入了该主题: kt.toStream().to("output_topic_name");

At somepoint in time, I wanted to schedule a thread to invoke the following, that writes to the topic: kt.toStream().to("output_topic_name");

我想确保所有数据都作为上述调用的一部分写入.另外,一旦上述"to"方法被调用,是否可以在下一个时间表中调用它?还是第一次调用总是保持活动状态?

I wanted to ensure all the data is written as part of the above invoke. Also, once the above "to" method is invoked, can it be invoked in the next schedule OR will the first invoke always stay active?

后续问题:

约束
1)好吧,一旦启动kafkastream,我就看到kstream和ktable是无限的/无限的.但是,不会ktable实现(在一个紧凑的主题上)在指定时间内为同一密钥发送多个条目.

Constraints
1) Ok, I see that the kstream and the ktable are unbounded/infinite once the kafkastream is kicked off. However, wouldn't ktable materialization (to a compacted topic) send multiple entries for the same key within a specified period.

因此,除非压缩过程尝试清除这些内容并仅保留最新的内容,否则下游应用程序将使用从该主题查询同一键的所有可用条目,从而导致重复.即使压缩过程进行了一定程度的清理,也总是不可能在给定的时间点上,随着压缩过程的发展,某些键的条目不止一个.

So, unless the compaction process attempts to clean these and retain only the latest one, the downstream application will consume all available entries for the same key querying from the topic, causing duplicates. Even if the compaction process does some level of cleanup, it is always not possible that at a given point in time, there are some keys that have more than one entries as the compaction process is catching up.

我假设对于RocksDB中的给定键,KTable仅具有一条记录.如果我们可以安排实现的时间,那将有助于避免重复.另外,减少在主题中保留的数据量(增加存储量),增加网络流量,在压缩过程中进行清理的额外开销.

I assume KTable will only have one record for a given key in the RocksDB. If we have a way to schedule the materialization, that will help to avoid the duplicates. Also, reduce the amount of data being persisted in topic (increasing the storage), increase in the network traffic, additional overhead to the compaction process to clean it up.

2)也许ReadOnlyKeyValueStore允许从存储中进行受控的检索,但是它仍然缺乏安排检索键,值和写入主题的方法,而这需要额外的编码.

2) Perhaps a ReadOnlyKeyValueStore would allow a controlled retrieval from the store, but it still lacks the way to schedule the retrieval of key, value and write to a topic, which requires additional coding.

是否可以改进API以实现受控的实现?

Can the API be improved to allow a controlled materialization?

推荐答案

KTable的实现永远不会完成,您也不能调用" to().

A KTable materialization never finishes and you cannot "invoke" a to() either.

使用Streams API时,您将"DAG"运算符塞在一起".实际的方法调用不会触发任何计算,而是会修改运算符的DAG.

When you use the Streams API, you "plug together" a DAG of operators. The actual method calls, don't trigger any computation but modify the DAG of operators.

仅在通过KafkaStreams#start()开始计算之后,才处理数据.请注意,您指定的所有运算符将在计算开始后连续并发运行.

Only after you start the computation via KafkaStreams#start() data is processed. Note, that all operators that you specified will run continuously and concurrently after the computation gets started.

没有计算结束"的原因,因为上游应用程序可以随时将新数据写入输入主题,因此预期输入将是无限/无限的.因此,您的程序永远不会自行终止.如果需要,您可以通过KafkaStreams#close()停止计算.

There is no "end of a computation" because the input is expected to be unbounded/infinite as upstream application can write new data into the input topics at any time. Thus, your program never terminates by itself. If required, you can stop the computation via KafkaStreams#close() though.

在执行期间,您无法更改DAG.如果要更改它,则需要停止计算并创建一个新的KafkaStreams实例,该实例将修改后的DAG作为输入

During execution, you cannot change the DAG. If you want to change it, you need to stop the computation and create a new KafkaStreams instance that takes the modified DAG as input

跟进:

是的.您必须将KTable视为更新条目时随时间演变的版本表".因此,所有更新都将写入更改日志主题,并作为更改记录发送到下游(请注意,KTables也进行了一些缓存,以重复删除"对同一键的连续更新:cf.

Yes. You have to think of a KTable as a "versioned table" that evolved over time when entries are updated. Thus, all updates are written to the changelog topic and sent downstream as change-records (note, that KTables do some caching, too, to "de-duplicate" consecutive updates to the same key: cf. https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html).

将使用从该主题中查询同一键的所有可用条目,从而导致重复.

will consume all available entries for the same key querying from the topic, causing duplicates.

我不会将其视为重复项",而应将其视为更新.是的,应用程序需要能够正确处理这些更新.

I would not consider those as "duplicates" but as updates. And yes, the application needs to be able to handle those updates correctly.

如果我们可以安排实现的时间,那将有助于避免重复.

if we have a way to schedule the materialization, that will help to avoid the duplicates.

材料标准化是一个连续的过程,只要输入主题中有可用的新输入记录并进行处理,KTable就会更新.因此,在任何时间点可能都有针对特定密钥的更新.因此,即使您完全控制何时向更新日志主题和/或下游发送更新,以后也可能会有新的更新.这就是流处理的本质.

Materialization is a continuous process and the KTable is updated whenever new input records are available in the input topic and processed. Thus, at any point in time there might be an update for a specific key. Thus, even if you have full control when to send updates to the changelog topic and/or downstream, there might be a new update later on. That is the nature of stream processing.

此外,减少在主题中保留的数据量(增加存储),增加网络流量,在压缩过程中进行清理的额外开销.

Also, reduce the amount of data being persisted in topic (increasing the storage), increase in the network traffic, additional overhead to the compaction process to clean it up.

如上所述,缓存用于节省资源.

As mentioned above, caching is used to save resources.

是否可以改进API以实现受控的实现?

Can the API be improved to allow a controlled materialization?

如果提供的KTable语义不符合您的要求,则您始终可以将自定义运算符编写为ProcessorTransformer,将键值存储区附加到该值,然后执行所需的任何操作.

If the provided KTable semantics don't meet your requirement, you can always write a custom operator as a Processor or Transformer, attach a key-value store to it, and implement whatever you need.

这篇关于Kafka Stream:KTable实现的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆