对 intervalJoin 感到困惑 [英] Confused about intervalJoin

查看:61
本文介绍了对 intervalJoin 感到困惑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试提出一种解决方案,该解决方案涉及在加入操作之后应用一些逻辑从多个 EventBstreamB 中选取一个事件代码>s.它就像一个 reduce 函数,但它只返回 1 个元素,而不是增量执行.因此,最终结果将是单个 (EventA, EventB) 对,而不是 1 个 EventA 和多个 EventB<的叉积/代码>.

I'm trying to come up with a solution which involves applying some logic after the join operation to pick one event from streamB among multiple EventBs. It would be like a reduce function but it only returns 1 element instead of doing it incrementally. So the end result would be a single (EventA, EventB) pair instead of a cross product of 1 EventA and multiple EventB.

streamA
      .keyBy((a: EventA) => a.common_key)
      .intervalJoin(
          streamB
            .keyBy((b: EventB) => b.common_key)
        )
      .between(Time.days(-30), Time.days(0))
      .process(new MyJoinFunction)

数据将被摄取(假设它们具有相同的密钥):

The data would be ingested like (assuming they have the same key):

EventB ts:1616686386000

EventB ts:1616686387000

EventB ts: 1616686388000

EventB ts:1616686389000

EventA ts: 1616686390000

每个 EventA 键保证只到达一次.

Each EventA key is guaranteed to arrive only once.

假设一个像上面这样的连接操作,它生成了 1 个 EventA 和 4 个 EventB,成功连接并收集在 MyJoinFunction 中.现在我想要做的是,立即访问这些值并执行一些逻辑以将 EventA 正确匹配到 恰好一个 EventB.例如,对于我需要的上述数据集 (EventA 1616686390000, EventB 1616686387000).

Assume a join operation like above and it generated 1 EventA with 4 EventB, successfully joined and collected in MyJoinFunction. Now what I want to do is, access these values at once and do some logic to correctly match the EventA to exactly one EventB. For example, for the above dataset I need (EventA 1616686390000, EventB 1616686387000).

MyJoinFunction 将为每个 (EventA, EventB) 对调用,但我希望在此之后进行操作,以便我访问一个迭代器,以便我可以查看每个 EventA 的所有 EventB 事件.

MyJoinFunction will be invoked for each (EventA, EventB) pair but I'd like an operation after this, that lets me access an iterator so I can look through all EventB events for each EventA.

我知道我可以在加入后应用另一个窗口操作来对所有对进行分组,但我希望在加入成功后立即发生这种情况.因此,如果可能的话,我想避免添加另一个窗口,因为我的窗口已经很大(30 天).

I am aware that I can apply another windowing operation after the join to group all the pairs, but I want this to happen immediately after the join succeeds. So if possible, I'd like to avoid adding another window since my window is already large (30 days).

Flink 是这个用例的正确选择还是我完全错了?

Is Flink the correct choice for this use case or I am completely in the wrong?

推荐答案

这可以实现为 KeyedCoProcessFunction.您可以通过公共密钥对两个流进行加密,将它们连接起来,然后一起处理这两个流.

This could be implemented as a KeyedCoProcessFunction. You would key both streams by their common key, connect them, and process both streams together.

您可以使用 ListState 来存储来自 B(对于给定键)的事件和用于 A(同样对于给定键)的 ValueState.您可以使用事件时间计时器来了解何时到了查看 ListState 中的 B 事件并生成结果的时间.完成后不要忘记清除状态.

You can use ListState to store the events from B (for a given key), and ValueState for A (again, for a given key). You can use an event time timer to know when the time has come to look through the B events in the ListState, and produce your result. Don't forget to clear the state once you are finished with it.

如果你不熟悉 Flink API 的这部分,流程函数教程 应该会有所帮助.

If you're not familiar with this part of the Flink API, the tutorial on Process Functions should be helpful.

这篇关于对 intervalJoin 感到困惑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆