如何将流数据与Dataflow/Beam中的大历史数据集结合在一起 [英] How to combine streaming data with large history data set in Dataflow/Beam

查看:69
本文介绍了如何将流数据与Dataflow/Beam中的大历史数据集结合在一起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究通过Google Dataflow/Apache Beam处理来自Web用户会话的日志,并且需要将用户日志的输入(流式传输)与上个月的用户会话历史结合起来.

I am investigating processing logs from web user sessions via Google Dataflow/Apache Beam and need to combine the user's logs as they come in (streaming) with the history of a user's session from the last month.

我研究了以下方法:

  1. 使用30天固定的窗口:最有可能在很大的窗口中放入内存,并且我不需要更新用户的历史记录,只需参考它即可.
  2. 使用CoGroupByKey联接两个数据集,但是两个数据集必须具有相同的窗口大小( https://cloud.google.com/dataflow/model/group-by-key#join ),对于我而言,情况并非如此(24小时对30天)
  3. 使用Side Input检索processElement(ProcessContext processContext)
  4. 中给定element的用户会话历史记录
  1. Use a 30 day fixed window: most likely to large of a window to fit into memory, and I do not need to update the user's history, just refer to it
  2. Use CoGroupByKey to join two data sets, but the two data sets must have the same window size (https://cloud.google.com/dataflow/model/group-by-key#join), which isn't true in my case (24h vs 30 days)
  3. Use Side Input to retrieve the user's session history for a given element in processElement(ProcessContext processContext)

我的理解是,通过.withSideInputs(pCollectionView)加载的数据需要放入内存中.我知道我可以将单个用户的所有会话历史记录都存储到内存中,但不能将所有 all 会话历史记录都存储到内存中.

My understanding is that the data loaded via .withSideInputs(pCollectionView) needs to fit into memory. I know I can fit all of a single user's session history into memory, but not all session histories.

我的问题是是否有一种方法可以从仅与当前用户会话相关的侧面输入中加载/流式传输数据?

My question is if there is a way to load/stream data from a side input that is only relevant to the current user session?

我正在想象一个parDo函数,该函数将通过指定用户的ID从侧面输入中加载用户的历史会话.但是只有当前用户的历史记录会话可以容纳在内存中.通过侧面输入加载所有历史会话将太大.

I am imagining a parDo function that would load the user's history session from the side input by specifying the user's ID. But only the current user's history session would fit into memory; loading all history sessions through side input would be too large.

一些伪代码来说明:

public static class MetricFn extends DoFn<LogLine, String> {

    final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;

    public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
        this.pHistoryView = historyView;
    }

    @Override
    public void processElement(ProcessContext processContext) throws Exception {
        Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);

        final LogLine currentLogLine = processContext.element();
        final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
        final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
        processContext.output(outputMetric);
    }
}

推荐答案

目前尚没有一种方法可以访问流中的每个键的侧面输入,但是正如您所描述的,它绝对有用,这就是我们所要做的正在考虑实施.

There is not currently a way of accessing per-key side inputs in streaming, but it would definitely be useful exactly as you describe, and it is something we are considering implementing.

一种可能的解决方法是使用侧面输入来分配指向实际会话历史记录的指针.生成24小时会话历史记录的代码可以将它们上传到GCS/BigQuery/etc,然后将位置作为侧面输入发送到加入代码.

One possible workaround is to use the side inputs to distribute pointers to the actual session history. The code generating the 24h session histories could upload them to GCS/BigQuery/etc, then send the locations as a side input to the joining code.

这篇关于如何将流数据与Dataflow/Beam中的大历史数据集结合在一起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆