对intervalJoin感到困惑 [英] Confused about intervalJoin

查看:81
本文介绍了对intervalJoin感到困惑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试提出一种解决方案,其中涉及在联接操作之后应用一些逻辑,以便从多个 EventB 中从 streamB 中选择一个事件 s.它就像一个reduce函数,但是它只返回1个元素,而不是递增地执行它.因此最终结果将是一个( EventA EventB )对,而不是1个 EventA 和多个 EventB .

I'm trying to come up with a solution which involves applying some logic after the join operation to pick one event from streamB among multiple EventBs. It would be like a reduce function but it only returns 1 element instead of doing it incrementally. So the end result would be a single (EventA, EventB) pair instead of a cross product of 1 EventA and multiple EventB.

streamA
      .keyBy((a: EventA) => a.common_key)
      .intervalJoin(
          streamB
            .keyBy((b: EventB) => b.common_key)
        )
      .between(Time.days(-30), Time.days(0))
      .process(new MyJoinFunction)

数据将被提取(假设它们具有相同的密钥):

The data would be ingested like (assuming they have the same key):

EventB ts:1616686386000

EventB ts:1616686387000

EventB ts:1616686388000

EventB ts:1616686389000

EventA ts:1616686390000

每个 EventA 密钥保证只能到达一次.

Each EventA key is guaranteed to arrive only once.

假设像上面这样的联接操作,它生成了1个 EventA 和4个 EventB ,成功地联接并收集在 MyJoinFunction 中.现在,我要做的是立即访问这些值,并执行一些逻辑以正确地将 EventA 完全一样 EventB 匹配.例如,对于上述数据集,我需要( EventA 1616686390000 EventB 1616686387000 ).

Assume a join operation like above and it generated 1 EventA with 4 EventB, successfully joined and collected in MyJoinFunction. Now what I want to do is, access these values at once and do some logic to correctly match the EventA to exactly one EventB. For example, for the above dataset I need (EventA 1616686390000, EventB 1616686387000).

MyJoinFunction ,但是我想在此之后进行操作,这样我就可以访问一个迭代器,以便我可以查看每个 EventA 的所有 EventB 事件.

MyJoinFunction will be invoked for each (EventA, EventB) pair but I'd like an operation after this, that lets me access an iterator so I can look through all EventB events for each EventA.

我知道我可以在联接之后应用另一个窗口操作来对所有对进行分组,但是我希望在联接成功后立即进行.因此,如果可能的话,我希望避免添加另一个窗口,因为我的窗口已经很大(30天).

I am aware that I can apply another windowing operation after the join to group all the pairs, but I want this to happen immediately after the join succeeds. So if possible, I'd like to avoid adding another window since my window is already large (30 days).

Flink 是该用例的正确选择,还是我完全错了?

Is Flink the correct choice for this use case or I am completely in the wrong?

推荐答案

这可以实现为 KeyedCoProcessFunction .您可以使用它们的通用密钥来对这两个流进行键控,连接并一起处理这两个流.

This could be implemented as a KeyedCoProcessFunction. You would key both streams by their common key, connect them, and process both streams together.

您可以使用 ListState 存储来自B的事件(对于给定的键),以及 ValueState 用于A的事件(同样,对于给定的键).您可以使用事件时间计时器来了解何时遍历ListState中的B事件并产生结果.完成状态后,别忘了清除状态.

You can use ListState to store the events from B (for a given key), and ValueState for A (again, for a given key). You can use an event time timer to know when the time has come to look through the B events in the ListState, and produce your result. Don't forget to clear the state once you are finished with it.

如果您不熟悉Flink API的这一部分,请

If you're not familiar with this part of the Flink API, the tutorial on Process Functions should be helpful.

这篇关于对intervalJoin感到困惑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆