如何管理Kafka KStream到Kstream窗口联接? [英] How to manage Kafka KStream to Kstream windowed join?

查看:89
本文介绍了如何管理Kafka KStream到Kstream窗口联接?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

基于 apache Kafka文档 KStream-to-KStream Joins are always windowed joins,我的问题是如何控制窗户的大小?保持主题数据的大小相同吗?举例来说,我们可以保留1个月的数据,但仅在过去一周内加入数据流?

Based on apache Kafka docs KStream-to-KStream Joins are always windowed joins, my question is how can I control the size of the window? Is it the same size for keeping the data on the topic? Or for example, we can keep data for 1 month but join the stream just for past week?

有没有很好的示例来显示窗口式KStream-to-kStream窗口式联接?

Is there any good example to show a windowed KStream-to-kStream windowed join?

就我而言,我有2个KStream,kstream1kstream2,我希望能够将10天的kstream1加入30天的kstream2.

In my case let's say I have 2 KStream, kstream1 and kstream2 I want to be able to join 10 days of kstream1 to 30 days of kstream2.

推荐答案

那是绝对可能的.定义Stream运算符时,可以显式指定连接窗口的大小.

That is absolutely possible. When you define you Stream operator, you specify the join window size explicitly.

KStream stream1 = ...;
KStream stream2 = ...;
long joinWindowSizeMs = 5L * 60L * 1000L; // 5 minutes
long windowRetentionTimeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days

stream1.leftJoin(stream2,
                 ... // add ValueJoiner
                 JoinWindows.of(joinWindowSizeMs)
);

// or if you want to use retention time

stream1.leftJoin(stream2,
                 ... // add ValueJoiner
                 (JoinWindows)JoinWindows.of(joinWindowSizeMs)
                                         .until(windowRetentionTimeMs)
);

请参见 http://docs.confluent.io/current/streams/developer-guide.html#joining-streams 了解更多详细信息.

See http://docs.confluent.io/current/streams/developer-guide.html#joining-streams for more details.

滑动窗口基本上定义了一个附加的连接谓词.在类似SQL的语法中,类似于:

The sliding window basically defines an additional join predicate. In SQL-like syntax this would be something like:

SELECT * FROM stream1, stream2
WHERE
   stream1.key = stream2.key
   AND
   stream1.ts - before <= stream2.ts
   AND
   stream2.ts <= stream1.ts + after

在此示例中为before == after == joinWindowSizeMs.如果使用JoinWindows#before()JoinWindows#after()显式设置这些值,则beforeafter也可以具有不同的值.

where before == after == joinWindowSizeMs in this example. before and after can also have different values if you use JoinWindows#before() and JoinWindows#after() to set those values explicitly.

源主题的保留时间完全独立于指定的windowRetentionTimeMs,该windowRetentionTimeMs应用于由Kafka Streams本身创建的变更日志主题.窗口保留功能可以使无序记录彼此连接,即,延迟到达的记录(请记住,Kafka具有基于 offset 的排序保证,但要考虑到时间戳) ,记录可能是乱序的.

The retention time of source topics, is completely independent of the specified windowRetentionTimeMs that is applied to an changelog topic created by Kafka Streams itself. Window retention allows to join out-of-order records with each other, i.e., record that arrive late (keep in mind, that Kafka has an offset based ordering guarantee, but with regard to timestamps, record can be out-of-order).

这篇关于如何管理Kafka KStream到Kstream窗口联接?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆