如何将流数据与 Dataflow/Beam 中的大型历史数据集结合起来 [英] How to combine streaming data with large history data set in Dataflow/Beam
问题描述
我正在研究通过 Google Dataflow/Apache Beam 处理来自网络用户会话的日志,并且需要将用户的日志(流式传输)与上个月的用户会话历史记录结合起来.
I am investigating processing logs from web user sessions via Google Dataflow/Apache Beam and need to combine the user's logs as they come in (streaming) with the history of a user's session from the last month.
我研究了以下方法:
- 使用 30 天固定窗口:最有可能是一个很大的窗口以适应内存,而且我不需要更新用户的历史记录,只需参考它
- 使用 CoGroupByKey 连接两个数据集,但两个数据集的窗口大小必须相同(https://cloud.google.com/dataflow/model/group-by-key#join),在我的情况下不是这样(24 小时 vs 30 天)
- 在
processElement(ProcessContext processContext)
中使用Side Input为给定的
element
检索用户的会话历史记录- Use a 30 day fixed window: most likely to large of a window to fit into memory, and I do not need to update the user's history, just refer to it
- Use CoGroupByKey to join two data sets, but the two data sets must have the same window size (https://cloud.google.com/dataflow/model/group-by-key#join), which isn't true in my case (24h vs 30 days)
- Use Side Input to retrieve the user's session history for a given
element
inprocessElement(ProcessContext processContext)
我的理解是通过 .withSideInputs(pCollectionView)
加载的数据需要适合内存.我知道我可以将单个用户的所有会话历史记录放入内存中,但不能将 所有 会话历史记录放入内存中.
My understanding is that the data loaded via .withSideInputs(pCollectionView)
needs to fit into memory. I know I can fit all of a single user's session history into memory, but not all session histories.
我的问题是是否有办法从仅与当前用户会话相关的侧输入加载/流式传输数据?
My question is if there is a way to load/stream data from a side input that is only relevant to the current user session?
我正在想象一个 parDo 函数,它会通过指定用户 ID 从侧面输入加载用户的历史会话.但只有当前用户的历史会话才能放入内存;通过侧边输入加载所有历史会话会太大.
I am imagining a parDo function that would load the user's history session from the side input by specifying the user's ID. But only the current user's history session would fit into memory; loading all history sessions through side input would be too large.
一些伪代码来说明:
public static class MetricFn extends DoFn<LogLine, String> {
final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;
public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
this.pHistoryView = historyView;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);
final LogLine currentLogLine = processContext.element();
final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
processContext.output(outputMetric);
}
}
推荐答案
目前没有一种方法可以访问流式传输中的每个键侧输入,但它肯定会如您所描述的那样有用,而我们正是这样做的考虑实施.
There is not currently a way of accessing per-key side inputs in streaming, but it would definitely be useful exactly as you describe, and it is something we are considering implementing.
一种可能的解决方法是使用侧输入来分发指向实际会话历史记录的指针.生成 24 小时会话历史的代码可以将它们上传到 GCS/BigQuery/等,然后将位置作为附加输入发送到加入代码.
One possible workaround is to use the side inputs to distribute pointers to the actual session history. The code generating the 24h session histories could upload them to GCS/BigQuery/etc, then send the locations as a side input to the joining code.
这篇关于如何将流数据与 Dataflow/Beam 中的大型历史数据集结合起来的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!