将不同的值写入Apache Beam中的不同BigQuery表 [英] Writing different values to different BigQuery tables in Apache Beam

查看:120
本文介绍了将不同的值写入Apache Beam中的不同BigQuery表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个 PCollection< Foo> ,我想将它写入多个BigQuery表,为每个 Foo



我怎样才能使用Apache Beam BigQueryIO API?

解决方案

这可以使用最近添加到Apache Beam中的 BigQueryIO / p>

  PCollection< Foo> foos = ...; 
foos.apply(BigQueryIO.write()。to(new SerializableFunction< ValueInSingleWindow< Foo>,TableDestination>(){
@Override
public TableDestination apply(ValueInSingleWindow< Foo> value){
Foo foo = value.getValue();
//此外还有:value.getWindow(),getTimestamp(),getPane()
String tableSpec = ...;
String tableDescription = ...;
return new TableDestination(tableSpec,tableDescription);
}
})。withFormatFunction(new SerializableFunction< Foo,TableRow>(){
@Override
public TableRow apply(Foo foo){
return ...;
}
})。withSchema(...));

取决于输入 PCollection< Foo> 是有界的还是无界的,在这种情况下,这将创建多个BigQuery导入作业(根据数据量创建一个或多个表),或者它将使用BigQuery流式插入API。



API的最灵活版本使用 DynamicDestinations ,它允许您为不同模式的不同表写入不同的值,甚至允许您使用侧面输入从所有这些计算的其余部分。

此外,BigQueryIO已被重构为多个可重用转换,您可以将它们合并以实现更复杂的用例 - 请参阅源文件夹中的文件



这个特性将包含在冷杉t Apache Beam的稳定版本以及Dataflow SDK的下一个版本(将基于Apache Beam的第一个稳定版本)。现在你可以通过在github的HEAD快照上运行你的管道来使用它。


Suppose I have a PCollection<Foo> and I want to write it to multiple BigQuery tables, choosing a potentially different table for each Foo.

How can I do this using the Apache Beam BigQueryIO API?

解决方案

This is possible using a feature recently added to BigQueryIO in Apache Beam.

PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
  @Override
  public TableDestination apply(ValueInSingleWindow<Foo> value) {  
    Foo foo = value.getValue();
    // Also available: value.getWindow(), getTimestamp(), getPane()
    String tableSpec = ...;
    String tableDescription = ...;
    return new TableDestination(tableSpec, tableDescription);
  }
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
  @Override
  public TableRow apply(Foo foo) {
    return ...;
  }
}).withSchema(...));

Depending on whether the input PCollection<Foo> is bounded or unbounded, under the hood this will either create multiple BigQuery import jobs (one or more per table depending on amount of data), or it will use the BigQuery streaming inserts API.

The most flexible version of the API uses DynamicDestinations, which allows you to write different values to different tables with different schemas, and even allows you to use side inputs from the rest of the pipeline in all of these computations.

Additionally, BigQueryIO has been refactored into a number of reusable transforms that you can yourself combine to implement more complex use cases - see files in the source directory.

This feature will be included in the first stable release of Apache Beam and into the next release of Dataflow SDK (which will be based on the first stable release of Apache Beam). Right now you can use this by running your pipeline against a snapshot of Beam at HEAD from github.

这篇关于将不同的值写入Apache Beam中的不同BigQuery表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆