从数据流管道写入BQ时的动态表名称 [英] Dynamic table name when writing to BQ from dataflow pipelines

查看:49
本文介绍了从数据流管道写入BQ时的动态表名称的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

作为以下问答的后续问题:

As a followup question to the following question and answer:

> https://stackoverflow.com/questions/31156774/about-key-grouping-with -groupbykey

我想与Google数据流工程团队( @jkff )进行确认,如果Eugene提出的第三个选择是google dataflow完全有可能:

I'd like to confirm with google dataflow engineering team (@jkff) if the 3rd option proposed by Eugene is at all possible with google dataflow:

具有一个使用这些键并创建BigQuery表的ParDo,以及另一个将数据和流写入表的ParDo"

"have a ParDo that takes these keys and creates the BigQuery tables, and another ParDo that takes the data and streams writes to the tables"

我的理解是ParDo/DoFn将处理每个元素,当从ParDo/DoFn的processElement写入时,我们如何指定表名(从侧面输入传递的键的功能)?

My understanding is that ParDo/DoFn will process each element, how could we specify a table name (function of the keys passed in from side inputs) when writing out from processElement of a ParDo/DoFn?

谢谢.

已更新,其中包含DoFn,由于c.element().value不是pcollection,因此显然无法正常工作.

Updated with a DoFn, which is not working obviously since c.element().value is not a pcollection.

PCollection<KV<String, Iterable<String>>> output = ...;

public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer> {

private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs) {
        this.keysAsSideinputs = keysAsSideinputs;
    }

@Override
    public void processElement(ProcessContext c) {
        List<String> keys = c.sideInput(keysAsSideinputs);
        String key = c.element().getKey();

        //the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
        c.element().getValue().apply(Pardo.of(new FormatLineFn()))
                .apply(TextIO.Write.to(key));

        c.output(1);
    }    
}    

推荐答案

BigQueryIO.Write转换不支持此功能.最接近的操作是使用每个窗口的表,并使用自定义WindowFn对在窗口对象中选择表所需的任何信息进行编码.

The BigQueryIO.Write transform does not support this. The closest thing you can do is to use per-window tables, and encode whatever information you need to select the table in the window objects by using a custom WindowFn.

如果您不想这样做,可以直接从DoFn进行BigQuery API调用.这样,您可以将表名设置为所需的任何值,这由代码计算得出.可以从侧面输入中查找,也可以直接从DoFn当前正在处理的元素中计算得出.为了避免对BigQuery进行过多的小型调用,您可以使用finishBundle();

If you don't want to do that, you can make BigQuery API calls directly from your DoFn. With this, you can set the table name to anything you want, as computed by your code. This could be looked up from a side input, or computed directly from the element the DoFn is currently processing. To avoid making too many small calls to BigQuery, you can batch up the requests using finishBundle();

您可以在此处查看数据流运行器如何进行流式导入: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java

You can see how the Dataflow runner does the streaming import here: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java

这篇关于从数据流管道写入BQ时的动态表名称的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆