Google数据流根据输入写入多个表 [英] Google dataflow write to mutiple tables based on input

查看:87
本文介绍了Google数据流根据输入写入多个表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些日志正试图推送到Google BigQuery.我正在尝试使用google dataflow构建整个管道.日志结构不同,可以分为四种不同的类型.在我的管道中,我从PubSub中读取日志以对其进行分析并将其写入BigQuery表.日志需要写入的表取决于日志中的一个参数.问题是我陷入了如何在运行时更改BigQueryIO.Write的TableName的问题上.

I have logs which I am trying to push to Google BigQuery. I am trying to build the entire pipeline using google dataflow. The log structure is different and can be classified into four different type. In my pipeline I read logs from PubSub parse it and write to BigQuery table. The table to which the logs need to written is depending on one parameter in logs. The problem is I am stuck on a point where how to change TableName for BigQueryIO.Write at runtime.

推荐答案

您可以使用侧面输出.

https://cloud.google.com/dataflow/model/par-do#itter-to-side-outputs-in-your-dofn

以下示例代码,读取BigQuery表并将其拆分为3个不同的PCollections.每个PCollection最终发送到一个不同的Pub/Sub主题(可以使用不同的BigQuery表).

The following sample code, reads a BigQuery table and splits it in 3 different PCollections. Each PCollections ends up sent to a different Pub/Sub topic (which could be different BigQuery tables instead).

Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

PCollection<TableRow> weatherData = p.apply(
        BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));

final TupleTag<String> readings2010 = new TupleTag<String>() {
};
final TupleTag<String> readings2000plus = new TupleTag<String>() {
};
final TupleTag<String> readingsOld = new TupleTag<String>() {
};

PCollectionTuple collectionTuple = weatherData.apply(ParDo.named("tablerow2string")
        .withOutputTags(readings2010, TupleTagList.of(readings2000plus).and(readingsOld))
        .of(new DoFn<TableRow, String>() {
            @Override
            public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {

                if (c.element().getF().get(2).getV().equals("2010")) {
                    c.output(c.element().toString());
                } else if (Integer.parseInt(c.element().getF().get(2).getV().toString()) > 2000) {
                    c.sideOutput(readings2000plus, c.element().toString());
                } else {
                    c.sideOutput(readingsOld, c.element().toString());
                }

            }
        }));
collectionTuple.get(readings2010)
        .apply(PubsubIO.Write.named("WriteToPubsub1").topic("projects/fh-dataflow/topics/bq2pubsub-topic1"));
collectionTuple.get(readings2000plus)
        .apply(PubsubIO.Write.named("WriteToPubsub2").topic("projects/fh-dataflow/topics/bq2pubsub-topic2"));
collectionTuple.get(readingsOld)
        .apply(PubsubIO.Write.named("WriteToPubsub3").topic("projects/fh-dataflow/topics/bq2pubsub-topic3"));

p.run();

这篇关于Google数据流根据输入写入多个表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆