Flink 使用一次性加入密钥加入流 [英] Flink join streams using a disposable one-time join key

查看:20
本文介绍了Flink 使用一次性加入密钥加入流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个关于在 Flink 上加入两个流的问题.我使用两个不同的数据流,在某些时候我需要加入他们.每个数据流都被标记了一个唯一的 id,作为这些流之间的连接点.没有窗口的概念,所以为了连接这两个数据流,我first.connect(second).keyBy(0,0).

I have a question around joining two streams on Flink. I use two different dataflows that at some point I need to join them. Each dataflow has been tagged with a unique id which serves as the joining point between these flows. There is no concept of window so in order to connect these two dataflows I do first.connect(second).keyBy(0,0).

当我得到正确的结果时,这似乎有效,但我的担忧是长期的.我没有明确保留任何执行连接的操作符(coFlatMap)上的状态,但是如果让我们说一个流(例如第一个)提供了唯一的id 和第二个未能提供加入 id(我想对于那些已经加入的操作员会丢弃任何类型的内部状态)?内存/状态足迹是否不断增长或存在某种到期机制?

This seems to work as I get the correct results, but my worries are on long term. I do not explicitly keep any state on the operator(coFlatMap) that does the join but what happens if let's say one flow (e.g first) provides the unique id and the second fails to provide the joining id (I suppose for those already joined the operator discards any kind of internal state) ? Does the memory/state footprint grows constantly or there is some kind of expiration mechanism ?

如果是这种情况,我该如何解决这个问题?或者你能建议我另一种方法吗?

And if this is the case how can I tackle this problem ? or alternatively can you suggest me another approach ?

推荐答案

有几种方法可以实现这种连接.

There are a few approaches to implement this join.

  1. 使用CoProcessFunction.当密钥的第一条记录到达时,您将其存储在 state 中并注册一个计时器,该计时器在 x 分钟/小时/天后触发.当第二条记录到达时,您执行连接并清除状态.如果第二条记录未到达,则在计时器触发时将调用 onTimer() 方法.此时,您可以只清除状态并返回(INNER JOIN 语义)或转发用 null 值填充的第一条记录(OUTER JOIN 语义),清除状态并返回.计时器充当安全网,能够在某个时刻移除状态.这取决于您希望等待第二条记录到达多长时间.

  1. Use a CoProcessFunction. When the first record for a key arrives, you store it in state and register a timer that fires x minutes/hours/days later. When the second record arrives, you perform the join and clear the state. If the second record does not arrive, the onTimer() method will be called when the timer fires. At that point, you can either just clear the state and return (INNER JOIN semantics) or forward the first record padded with a null value (OUTER JOIN semantics), clear the state, and return. The timer serves as a safety net to be able to remove the state at some point. It depends on your requirements how long you want to wait for the second record to arrive.

Table API 或 SQL 提供了时间窗口连接 (Table APISQL) 的工作方式与我在 1 中描述的类似.不同之处在于窗口连接实现将尝试连接所有记录(即,多个记录来自每个输入流)在连接间隔期间到达,因此会使状态保持更长时间.但是,一旦时间超过加入间隔,它就会清除状态.

The Table API or SQL provides a time-windowed join (Table API, SQL) that works similar to what I described in 1. The difference is that the windowed join implementation would try to join all records (i.e., more than one from each input stream) that arrive during the join interval and hence would keep the state longer. However, once the time is past the join interval, it would clear the state.

Flink 1.6.0(将于 2018 年 8 月上旬发布)将包含一个 interval join 用于 DataStream API,其工作方式类似于 Table API 的窗口连接(逻辑相似,名称不同).与基于每个键在每一侧仅出现一次的假设的自定义实现相比,它还会使状态保持更长的时间.

Flink 1.6.0 (to be released early August 2018) will include an interval join for the DataStream API which works similar to the window join of the Table API (similar logic, different name). It would also keep the state longer than the custom implementation which is based on the assumption that each key appears just once on each side.

我会选择方法 1.因为它的内存效率更高,而且仍然相当容易实现.

I would go for approach 1. because it is more memory efficient and still reasonably easy to implement.

这篇关于Flink 使用一次性加入密钥加入流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆