将数据从本地磁盘加载到BigQuery和Google云存储的策略 [英] Strategy for loading data into BigQuery and Google cloud Storage from local disk

本文介绍了将数据从本地磁盘加载到BigQuery和Google云存储的策略的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的本地磁盘中有两年的大小为300GB左右的组合数据,这是我从Teradata中提取的。我必须将相同的数据加载到谷歌云存储和BigQuery表中。

谷歌云存储中的最终数据应该以压缩格式进行白日分类(每日文件应该是gz格式的单个文件)。
我还必须在BigQuery中加载数据,每天的数据应该存储在一个分区中。



我加载了组合数据2年谷歌存储第一。然后尝试使用谷歌数据流,通过使用数据流中的分区概念并将其加载到谷歌云存储(FYI数据流分区不同于bigquery分区),从而明智地分离数据。但数据流不允许创建730个分区(2年),因为它碰到 413请求实体太大(管道的序列化JSON表示的大小超过了允许限制)。



因此,我运行了数据流作业两次,每次过滤数据
过滤每一年的数据并将其写入Google云存储中的单独文件,但无法压缩它,因为数据流目前无法写入压缩文件。



看到第一种方法失败,我想用数据流中的分区过滤1组合数据中的一年数据然后将其直接写入BigQuery,然后以压缩格式将其导出到谷歌存储中,这个过程会重复两次
但是在这种方法中,我无法一次性写入超过45天的数据,因为我反复击中 java.lang.OutOfMemoryError:Java堆空间问题。所以这个startegy也失败了



在压缩格式和BigQuery中寻找明智的将日期分隔迁移到谷歌存储的策略有帮助吗?

解决方案

目前,对结果进行分区是生成多个输出文件/表的最佳方式。你可能遇到的情况是,每次写入都为上传分配一个缓冲区,所以如果你有一个分区后跟N个写入,则有N个缓冲区。



有两种策略可以完成这项工作。


  1. 您可以使用 uploadBufferSizeBytes来减少上传缓冲区的大小选项GcsOptions.java#L82rel =nofollow> GcsOptions 。请注意,这可能会降低上传速度,因为需要更频繁地刷新缓冲区。

  2. 您可以应用重新洗牌操作每个 PCollection 在分区之后。这将限制同时运行的并发BigQuery接收器的数量,因此将分配更少的缓冲器。

例如,您可以执行类似:

  PCollection< Data> allData = ...; 
PCollectionList< Data>分区= allData.apply(Partition.of(...));

//假设分区函数产生了numDays分区,
//并且这些分区可以以某种有意义的方式映射回当天:
for(int i = 0; i< numDays; i ++){
String outputName = nameFor(i); //计算输出名称
partitions.get(i)
.apply(Write_+ outputName,ReshuffleAndWrite(outputName));
}

利用这两个助手PTransforms:

  private static class Reshuffle< T> 
扩展了PTransform< PCollection< T>,PCollection< T> {
@Override
public PCollection< T> (
返回
.apply(Random Key,WithKeys.of(
new SerializableFunction< T,Integer>(){
@覆盖
public Integer apply(Data value){
return ThreadLocalRandom.current()。nextInt();
}
)))
.apply(Shuffle ,GroupByKey。< Integer,T> create())
.apply(Remove Key,Values.create());
}
}

private static class ReshuffleAndWrite
extends PTransform< PCollection< Data> ;, PDone> {

private final String outputName;
public ReshuffleAndWrite(String outputName){
this.outputName = outputName;

$ b @Override
public PDone apply(PCollection< Data>){
return in
.apply(Reshuffle,new Reshuffle< Data>())
.apply(Write,BigQueryIO.Write.to(tableNameFor(outputName)
.withSchema(schema)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));



I have 2 years of combined data of size around 300GB in my local disk which i have extracted from teradata. I have to load the same data to both google cloud storage and BigQuery table.

The final data in google cloud storage should be day wise segregated in compressed format(each day file should be a single file in gz format). I also have to load the data in BigQuery in a day wise partitioned table i.e. each day's data should be stored in one partition.

I loaded the combined data of 2 years to google storage first. Then tried using google dataflow to day wise segregate data by using the concept of partitioning in dataflow and load it to google cloud storage (FYI dataflow partitioning is different from bigquery partitioning). But dataflow did not allow to create 730 partitions(for 2 years) as it hit the 413 Request Entity Too Large (The size of serialized JSON representation of the pipeline exceeds the allowable limit").

So I ran the dataflow job twice which filtered data for each year. It filtered each one year's data and wrote it into separate files in google cloud storage but it could not compress it as dataflow currently cannot write to compressed files.

Seeing the first approach fail, I thought of filtering 1 the one year's data from the combined data using partioning in dataflow as explained above and writing it directly to BigQuery and then exporting it to google storage in compressed format. This process would have been repeated twice. But in this approach i could not write more than 45 days data at once as I repeatedly hit java.lang.OutOfMemoryError: Java heap space issue. So this startegy also failed

Any help in figuring out a strategy for date wise segregated migration to google storage in compressed format and BigQuery would be of great help?

解决方案

Currently, partitioning the results is the best way to produce multiple output files/tables. What you're likely running into is the fact that each write allocates a buffer for the uploads, so if you have a partition followed by N writes, there are N buffers.

There are two strategies for making this work.

  1. You can reduce the size of the upload buffers using the uploadBufferSizeBytes option in GcsOptions. Note that this may slow down the uploads since the buffers will need to be flushed more frequently.
  2. You can apply a Reshuffle operation to each PCollection after the partition. This will limit the number of concurrent BigQuery sinks running simultaneously, so fewer buffers will be allocated.

For example, you could do something like:

PCollection<Data> allData = ...;
PCollectionList<Data> partitions = allData.apply(Partition.of(...));

// Assuming the partitioning function has produced numDays partitions,
// and those can be mapped back to the day in some meaningful way:
for (int i = 0; i < numDays; i++) {
  String outputName = nameFor(i); // compute the output name
  partitions.get(i)
    .apply("Write_" + outputName, ReshuffleAndWrite(outputName));
}

That makes use of these two helper PTransforms:

private static class Reshuffle<T>
  extends PTransform<PCollection<T>, PCollection<T>> {
  @Override
  public PCollection<T> apply(PCollection<T> in) {
    return in
      .apply("Random Key", WithKeys.of(
          new SerializableFunction<T, Integer>() {
            @Override
            public Integer apply(Data value) {
              return ThreadLocalRandom.current().nextInt();
            }
          }))
      .apply("Shuffle", GroupByKey.<Integer, T>create())
      .apply("Remove Key", Values.create());
  }
}

private static class ReshuffleAndWrite 
  extends PTransform<PCollection<Data>, PDone> {

  private final String outputName;
  public ReshuffleAndWrite(String outputName) {
    this.outputName = outputName;
  }

  @Override
  public PDone apply(PCollection<Data> in) {
    return in
      .apply("Reshuffle", new Reshuffle<Data>())
      .apply("Write", BigQueryIO.Write.to(tableNameFor(outputName)
        .withSchema(schema)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
  }
}

这篇关于将数据从本地磁盘加载到BigQuery和Google云存储的策略的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆