如何管理Kafka KStream到Kstream窗口连接? [英] How to manage Kafka KStream to Kstream windowed join?

查看:29
本文介绍了如何管理Kafka KStream到Kstream窗口连接?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

基于 apache Kafka 文档 KStream-to-KStream Joins 总是窗口连接,我的问题是如何控制窗口的大小?保持主题数据的大小是否相同?或者,例如,我们可以保留 1 个月的数据,但只加入过去一周的数据流?

Based on apache Kafka docs KStream-to-KStream Joins are always windowed joins, my question is how can I control the size of the window? Is it the same size for keeping the data on the topic? Or for example, we can keep data for 1 month but join the stream just for past week?

有没有什么好的例子来展示一个窗口化的 KStream-to-kStream 窗口化连接?

Is there any good example to show a windowed KStream-to-kStream windowed join?

就我而言,假设我有 2 个 KStream,kstream1kstream2 我希望能够加入 10 天的 kstream1 到 30kstream2 的天数.

In my case let's say I have 2 KStream, kstream1 and kstream2 I want to be able to join 10 days of kstream1 to 30 days of kstream2.

推荐答案

这绝对有可能.当您定义您的 Stream 运算符时,您明确指定了连接窗口大小.

That is absolutely possible. When you define you Stream operator, you specify the join window size explicitly.

KStream stream1 = ...;
KStream stream2 = ...;
long joinWindowSizeMs = 5L * 60L * 1000L; // 5 minutes
long windowRetentionTimeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days

stream1.leftJoin(stream2,
                 ... // add ValueJoiner
                 JoinWindows.of(joinWindowSizeMs)
);

// or if you want to use retention time

stream1.leftJoin(stream2,
                 ... // add ValueJoiner
                 (JoinWindows)JoinWindows.of(joinWindowSizeMs)
                                         .until(windowRetentionTimeMs)
);

参见http://docs.confluent.io/current/streams/developer-guide.html#joining-streams 了解更多详情.

滑动窗口基本上定义了一个额外的连接谓词.在类似 SQL 的语法中,这将类似于:

The sliding window basically defines an additional join predicate. In SQL-like syntax this would be something like:

SELECT * FROM stream1, stream2
WHERE
   stream1.key = stream2.key
   AND
   stream1.ts - before <= stream2.ts
   AND
   stream2.ts <= stream1.ts + after

where before == after == joinWindowSizeMs 在这个例子中.如果使用 JoinWindows#before()JoinWindows#after()beforeafter 也可以有不同的值明确设置这些值.

where before == after == joinWindowSizeMs in this example. before and after can also have different values if you use JoinWindows#before() and JoinWindows#after() to set those values explicitly.

源主题的保留时间,完全独立于指定的 windowRetentionTimeMs,该windowRetentionTimeMs 适用于由 Kafka Streams 本身创建的更改日志主题.窗口保留允许将无序记录相互连接,即迟到的记录(请记住,Kafka 具有基于偏移量的排序保证,但关于时间戳,记录可能乱序).

The retention time of source topics, is completely independent of the specified windowRetentionTimeMs that is applied to an changelog topic created by Kafka Streams itself. Window retention allows to join out-of-order records with each other, i.e., record that arrive late (keep in mind, that Kafka has an offset based ordering guarantee, but with regard to timestamps, record can be out-of-order).

这篇关于如何管理Kafka KStream到Kstream窗口连接?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆