将侧面输入应用于Apache Beam中的BigQueryIO.read操作 [英] Apply Side input to BigQueryIO.read operation in Apache Beam
问题描述
是否可以将侧面输入应用于Apache Beam中的BigQueryIO.read()操作.
Is there a way to apply a side input to a BigQueryIO.read() operation in Apache Beam.
例如,我在PCollection中有一个值,我想在查询中使用该值从BigQuery表中获取数据.使用侧面输入可以吗?还是在这种情况下应该使用其他东西?
Say for example I have a value in a PCollection that I want to use in a query to fetch data from a BigQuery table. Is this possible using side input? Or should something else be used in such a case?
我在类似的情况下使用NestedValueProvider,但我想只有在某个值取决于我的运行时值时,我们才可以使用它.或者我可以在这里使用相同的东西吗?如果我错了,请纠正我.
I used NestedValueProvider in a similar case but I guess we can use that only when a certain value depends on my runtime value. Or can I use the same thing here? Please correct me if I'm wrong.
我尝试过的代码:
Bigquery bigQueryClient = start_pipeline.newBigQueryClient(options.as(BigQueryOptions.class)).build();
Tabledata tableRequest = bigQueryClient.tabledata();
PCollection<TableRow> existingData = readData.apply("Read existing data",ParDo.of(new DoFn<String,TableRow>(){
@ProcessElement
public void processElement(ProcessContext c) throws IOException
{
List<TableRow> list = c.sideInput(bqDataView);
String tableName = list.get(0).get("table").toString();
TableDataList table = tableRequest.list("projectID","DatasetID",tableName).execute();
for(TableRow row:table.getRows())
{
c.output(row);
}
}
}).withSideInputs(bqDataView));
我得到的错误是:
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize BeamTest.StarterPipeline$1@86b455
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:569)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
at BeamTest.StarterPipeline.main(StarterPipeline.java:158)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.Bigquery$Tabledata
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.writeObject(Unknown Source)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
... 4 more
推荐答案
Beam模型当前不十分支持这种与数据相关的操作.
The Beam model does not currently support this kind of data-dependent operation very well.
一种实现方法是编写自己的DoFn
,该DoFn
接收侧面输入并直接连接到BQ.不幸的是,这不会给您任何并行性,因为DoFn
会完全在同一线程上运行.
A way of doing it is to code your own DoFn
that receives the side input and connects directly to BQ. Unfortunately, this would not give you any parallelism, as the DoFn
would run completely on the same thread.
Beam支持可拆分表DoFn
之后,情况将有所不同.
Once Splittable DoFn
s are supported in Beam, this will be a different story.
In the current state of the world, you would need to use the BQ client library to add code that would query BQ as if you were not in a Beam pipeline.
给出您问题中的代码,以下是实现该问题的大致思路:
Given the code in your question, a rough idea on how to implement this is the following:
class ReadDataDoFn extends DoFn<String,TableRow>(){
private Tabledata tableRequest;
private Bigquery bigQueryClient;
private Bigquery createBigQueryClientWithinDoFn() {
// I'm not sure how you'd implement this, but you had the right idea
}
@Setup
public void setup() {
bigQueryClient = createBigQueryClientWithinDoFn();
tableRequest = bigQueryClient.tabledata();
}
@ProcessElement
public void processElement(ProcessContext c) throws IOException
{
List<TableRow> list = c.sideInput(bqDataView);
String tableName = list.get(0).get("table").toString();
TableDataList table = tableRequest.list("projectID","DatasetID",tableName).execute();
for(TableRow row:table.getRows())
{
c.output(row);
}
}
}
PCollection<TableRow> existingData = readData.apply("Read existing data",ParDo.of(new ReadDataDoFn()));
这篇关于将侧面输入应用于Apache Beam中的BigQueryIO.read操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!