使用Google Dataflow的长时间状态 [英] Long lived state with Google Dataflow

查看:117
本文介绍了使用Google Dataflow的长时间状态的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

试图在这里让我的头脑围绕编程模型。情景是我使用Pub / Sub + Dataflow来为网络论坛分析仪器。我有一个来自Pub / Sub的数据流如下: TS | EventType
1 | 1 |创建
1 | 2 |评论
2 | 2 |创建
1 | 4 |评论

我想最终得到来自Dataflow的流,如下所示:

  ID | TS | num_comments 
1 | 1 | 0
1 | 2 | 1
2 | 2 | 0
1 | 4 | 2

我希望这个汇总的作业可以作为一个流过程运行,新的计数被填充因为新的事件进来了。我的问题是,工作存储状态的当前主题ID和评论计数的地方在哪里?假设主题可以生活多年。当前的想法是:


  • 将主题ID的'current'条目写入BigTable,并在DoFn查询中为当前评论计数主题ID即将发布。即使我写这篇文章,我也不是粉丝。

  • 不知何故使用侧面输入?这似乎也许这是答案,但如果是这样,我不完全理解。

  • 使用全局窗口设置流式作业,每触发一次就会关闭一条记录,并依靠Dataflow将整个窗格历史记录保存在某处。 (无限存储要求?)


    编辑:为了澄清,我不会有任何麻烦实施这三种策略中的任何一种,或者还有数百万种不同的其他方式,我更关心什么是使用Dataflow做这件事的最佳方式。什么将最容易失败,不得不重新处理历史回填等等等等。

    编辑2:目前数据流服务存在一个错误,其中更新失败如果将输入添加到平坦化转换中,这意味着如果对包括向平坦化操作添加内容的作业进行更改,则需要丢弃并重建作业中累积的任何状态。




    $你应该能够使用触发器和一个组合来实现这一点。

      PCollection< ID>评论= / *源的ID * /; 
    PCollection< KV< ID,Long>> commentCounts = comments
    //通过触发数据来产生推测结果
    //注意这不会在每个*元素后面触发,但是它会以
    //相对快速触发(因为系统将输入数据
    //划分为工作单位)。你也可以用
    //来加以限制:
    // AfterProcessingTime.pastFirstElementInPane()
    // .plusDelayOf(Duration.standardMinutes(5))
    //这将会每5分钟产生一次输出
    .apply(Window.triggering(
    Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
    .accumulatingFiredPanes())
    //计算出现次数每个ID
    .apply(Count.perElement());

    //产生一个输出字符串 - 在你的用例中你想产生
    //一行并将其写入相应的源代码
    commentCounts.apply(new DoFn< KV< ID,Long>,String>(){
    public void processElement(ProcessContext c){
    KV< ID,Long> element = c.element();
    //这包括窗口窗格的详细信息是
    //处理,并且包括一个严格增加的索引b $ b // //为键生成的窗格数量
    PaneInfo窗格= c.pane();
    return element.key()+|+ pane.getIndex()+|+ element.value();
    }
    });

    根据您的数据,您还可以从源读取整个评论,提取ID,然后使用 Count.perKey()来获取每个ID的计数。如果您想要更复杂的组合,可以查看定义 CombineFn 并使用 Combine.perKey


    Just trying to get my head around the programming model here. Scenario is I'm using Pub/Sub + Dataflow to instrument analytics for a web forum. I have a stream of data coming from Pub/Sub that looks like:

    ID | TS | EventType
    1  | 1  | Create
    1  | 2  | Comment
    2  | 2  | Create
    1  | 4  | Comment
    

    And I want to end up with a stream coming from Dataflow that looks like:

    ID | TS | num_comments
    1  | 1  | 0
    1  | 2  | 1
    2  | 2  | 0
    1  | 4  | 2
    

    I want the job that does this rollup to run as a stream process, with new counts being populated as new events come in. My question is, where is the idiomatic place for the job to store the state for the current topic id and comment counts? Assuming that topics can live for years. Current ideas are:

    • Write a 'current' entry for the topic id to BigTable and in a DoFn query what the current comment count for the topic id is coming in. Even as I write this I'm not a fan.
    • Use side inputs somehow? It seems like maybe this is the answer, but if so I'm not totally understanding.
    • Set up a streaming job with a global window, with a trigger that goes off every time it gets a record, and rely on Dataflow to keep the entire pane history somewhere. (unbounded storage requirement?)

    EDIT: Just to clarify, I wouldn't have any trouble implementing any of these three strategies, or a million different other ways of doing it, I'm more interested in what is the best way of doing it with Dataflow. What will be most resilient to failure, having to re-process history for a backfill, etc etc.

    EDIT2: There is currently a bug with the dataflow service where updates fail if adding inputs to a flatten transformation, which will mean you'll need to discard and rebuild any state accrued in the job if you make a change to a job that includes adding something to a flatten operation.

    解决方案

    You should be able to use triggers and a combine to accomplish this.

    PCollection<ID> comments = /* IDs from the source */;
    PCollection<KV<ID, Long>> commentCounts = comments
        // Produce speculative results by triggering as data comes in.
        // Note that this won't trigger after *every* element, but it will
        // trigger relatively quickly (as the system divides incoming data
        // into work units). You could also throttle this with something
        // like:
        //   AfterProcessingTime.pastFirstElementInPane()
        //     .plusDelayOf(Duration.standardMinutes(5))
        // which will produce output every 5 minutes
        .apply(Window.triggering(
                Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
            .accumulatingFiredPanes())
        // Count the occurrences of each ID
        .apply(Count.perElement());
    
    // Produce an output String -- in your use case you'd want to produce
    // a row and write it to the appropriate source  
    commentCounts.apply(new DoFn<KV<ID, Long>, String>() {
      public void processElement(ProcessContext c) {
        KV<ID, Long> element = c.element();
        // This includes details about the pane of the window being
        // processed, and including a strictly increasing index of the
        // number of panes that have been produced for the key.        
        PaneInfo pane = c.pane();
        return element.key() + " | " + pane.getIndex() + " | " + element.value(); 
      }
    });
    

    Depending on your data, you could also read whole comments from the source, extract the ID, and then use Count.perKey() to get the counts for each ID. If you want a more complicated combination, you could look at defining a custom CombineFn and using Combine.perKey.

    这篇关于使用Google Dataflow的长时间状态的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆