对intervalJoin感到困惑 [英] Confused about intervalJoin
问题描述
我正在尝试提出一种解决方案,其中涉及在联接操作之后应用一些逻辑,以便从多个 EventB 中从
streamB
中选择一个事件代码> s.它就像一个reduce函数,但是它只返回1个元素,而不是递增地执行它.因此最终结果将是一个( EventA
, EventB
)对,而不是1个 EventA
和多个 EventB
.
I'm trying to come up with a solution which involves applying some logic after the join operation to pick one event from streamB
among multiple EventB
s. It would be like a reduce function but it only returns 1 element instead of doing it incrementally. So the end result would be a single (EventA
, EventB
) pair instead of a cross product of 1 EventA
and multiple EventB
.
streamA
.keyBy((a: EventA) => a.common_key)
.intervalJoin(
streamB
.keyBy((b: EventB) => b.common_key)
)
.between(Time.days(-30), Time.days(0))
.process(new MyJoinFunction)
数据将被提取(假设它们具有相同的密钥):
The data would be ingested like (assuming they have the same key):
EventB ts:1616686386000
EventB ts:1616686387000
EventB ts:1616686388000
EventB ts:1616686389000
EventA ts:1616686390000
每个 EventA
密钥保证只能到达一次.
Each EventA
key is guaranteed to arrive only once.
假设像上面这样的联接操作,它生成了1个 EventA
和4个 EventB
,成功地联接并收集在 MyJoinFunction
中.现在,我要做的是立即访问这些值,并执行一些逻辑以正确地将 EventA
与完全一样 EventB
匹配.例如,对于上述数据集,我需要( EventA
1616686390000
, EventB
1616686387000
).
Assume a join operation like above and it generated 1 EventA
with 4 EventB
, successfully joined and collected in MyJoinFunction
. Now what I want to do is, access these values at once and do some logic to correctly match the EventA
to exactly one EventB
.
For example, for the above dataset I need (EventA
1616686390000
, EventB
1616686387000
).
MyJoinFunction
,但是我想在此之后进行操作,这样我就可以访问一个迭代器,以便我可以查看每个 EventA
的所有 EventB
事件.
MyJoinFunction
will be invoked for each (EventA
, EventB
) pair but I'd like an operation after this, that lets me access an iterator so I can look through all EventB
events for each EventA
.
我知道我可以在联接之后应用另一个窗口操作来对所有对进行分组,但是我希望在联接成功后立即进行.因此,如果可能的话,我希望避免添加另一个窗口,因为我的窗口已经很大(30天).
I am aware that I can apply another windowing operation after the join to group all the pairs, but I want this to happen immediately after the join succeeds. So if possible, I'd like to avoid adding another window since my window is already large (30 days).
Flink 是该用例的正确选择,还是我完全错了?
Is Flink the correct choice for this use case or I am completely in the wrong?
推荐答案
这可以实现为 KeyedCoProcessFunction
.您可以使用它们的通用密钥来对这两个流进行键控,连接并一起处理这两个流.
This could be implemented as a KeyedCoProcessFunction
. You would key both streams by their common key, connect them, and process both streams together.
您可以使用 ListState
存储来自B的事件(对于给定的键),以及 ValueState
用于A的事件(同样,对于给定的键).您可以使用事件时间计时器来了解何时遍历ListState中的B事件并产生结果.完成状态后,别忘了清除状态.
You can use ListState
to store the events from B (for a given key), and ValueState
for A (again, for a given key). You can use an event time timer to know when the time has come to look through the B events in the ListState, and produce your result. Don't forget to clear the state once you are finished with it.
If you're not familiar with this part of the Flink API, the tutorial on Process Functions should be helpful.
这篇关于对intervalJoin感到困惑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!