将侧面输入应用于Apache Beam中的BigQueryIO.read操作 [英] Apply Side input to BigQueryIO.read operation in Apache Beam

查看:164
本文介绍了将侧面输入应用于Apache Beam中的BigQueryIO.read操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以将侧面输入应用于Apache Beam中的BigQueryIO.read()操作.

Is there a way to apply a side input to a BigQueryIO.read() operation in Apache Beam.

例如,我在PCollection中有一个值,我想在查询中使用该值从BigQuery表中获取数据.使用侧面输入可以吗?还是在这种情况下应该使用其他东西?

Say for example I have a value in a PCollection that I want to use in a query to fetch data from a BigQuery table. Is this possible using side input? Or should something else be used in such a case?

我在类似的情况下使用NestedValueProvider,但我想只有在某个值取决于我的运行时值时,我们才可以使用它.或者我可以在这里使用相同的东西吗?如果我错了,请纠正我.

I used NestedValueProvider in a similar case but I guess we can use that only when a certain value depends on my runtime value. Or can I use the same thing here? Please correct me if I'm wrong.

我尝试过的代码:

Bigquery bigQueryClient = start_pipeline.newBigQueryClient(options.as(BigQueryOptions.class)).build();
    Tabledata tableRequest = bigQueryClient.tabledata();

PCollection<TableRow> existingData = readData.apply("Read existing data",ParDo.of(new DoFn<String,TableRow>(){
    @ProcessElement
    public void processElement(ProcessContext c) throws IOException
    {
        List<TableRow> list = c.sideInput(bqDataView);
        String tableName = list.get(0).get("table").toString();
        TableDataList table = tableRequest.list("projectID","DatasetID",tableName).execute();

        for(TableRow row:table.getRows())
        {
            c.output(row);
        }
    }
    }).withSideInputs(bqDataView));

我得到的错误是:

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize BeamTest.StarterPipeline$1@86b455
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
    at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
    at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:569)
    at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
    at BeamTest.StarterPipeline.main(StarterPipeline.java:158)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.Bigquery$Tabledata
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
    ... 4 more

推荐答案

Beam模型当前不十分支持这种与数据相关的操作.

The Beam model does not currently support this kind of data-dependent operation very well.

一种实现方法是编写自己的DoFn,该DoFn接收侧面输入并直接连接到BQ.不幸的是,这不会给您任何并行性,因为DoFn会完全在同一线程上运行.

A way of doing it is to code your own DoFn that receives the side input and connects directly to BQ. Unfortunately, this would not give you any parallelism, as the DoFn would run completely on the same thread.

Beam支持可拆分表DoFn之后,情况将有所不同.

Once Splittable DoFns are supported in Beam, this will be a different story.

在目前的状态下,您需要使用

In the current state of the world, you would need to use the BQ client library to add code that would query BQ as if you were not in a Beam pipeline.

给出您问题中的代码,以下是实现该问题的大致思路:

Given the code in your question, a rough idea on how to implement this is the following:

class ReadDataDoFn extends DoFn<String,TableRow>(){
    private Tabledata tableRequest;

    private Bigquery bigQueryClient;

    private Bigquery createBigQueryClientWithinDoFn() {
        // I'm not sure how you'd implement this, but you had the right idea
    }

    @Setup
    public void setup() {
        bigQueryClient = createBigQueryClientWithinDoFn(); 
        tableRequest = bigQueryClient.tabledata();
    }
    @ProcessElement
    public void processElement(ProcessContext c) throws IOException
    {
        List<TableRow> list = c.sideInput(bqDataView);
        String tableName = list.get(0).get("table").toString();
        TableDataList table = tableRequest.list("projectID","DatasetID",tableName).execute();

        for(TableRow row:table.getRows())
        {
            c.output(row);
        }
    }
}

PCollection<TableRow> existingData = readData.apply("Read existing data",ParDo.of(new ReadDataDoFn()));

这篇关于将侧面输入应用于Apache Beam中的BigQueryIO.read操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆