如何在BigQueryIO.Write之前读取和转换CSV标头? [英] How do I read and transform CSV Headers before BigQueryIO.Write?

查看:82
本文介绍了如何在BigQueryIO.Write之前读取和转换CSV标头?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个csv文件,其中包含标头作为第一行.我正在阅读它,并清理这些标头以匹配BigQuery列的要求.但是我需要引用管道开始之前的 模式.允许BigQueryIO.Write以这种方式响应标头的最佳做法是什么?目前,我的代码如下所示:

I have a csv file which contains headers as the first row. I'm reading it in and cleaning up those headers to match BigQuery column requirements. But I need a reference to the schema before the pipeline begins. What are the best practices for allowing BigQueryIO.Write to be responsive to the headers in this way? Currently my code looks something like this:

//create table
Table table = new Table();
// Where logically should the following line go?
TableSchema customSchema = ?
table.setSchema(customSchema);
TableReference tableRef = new TableReference();
tableRef.setDatasetId("foo_dataset");
tableRef.setProjectId("bar_project");
tableRef.setTableId("baz_table");
table.setTableReference(tableRef);

Pipeline p = Pipeline.create(options);

p.apply(TextIO.Read.named("ReadCSV").from("gs://bucket/file.csv"))
  // Detect if it's header row
  .apply(ParDo.of(new ExtractHeader()))
  .apply(ParDo.of(new ToTableRow())
  .apply(BigQueryIO.Write.named("Write")
    .to(tableRef)
    // Where logically should the following line go?
    .withSchema(customSchema));
p.run();

我目前正在尝试实现大概两个管道,看起来(大致)如下所示,但是在Dataflow中执行顺序不可靠,所以在BQ表不存在的地方我遇到了错误.

I'm currently trying to implement perhaps two pipelines, looking (roughly) like the following, but the execution order is unreliable in Dataflow so I am getting errors where the BQ table doesn't exist.

PCollection readIn = p.apply(TextIO.Read.named("ReadCSV").from("gs://bucket/file.csv"))
  .apply(ParDo.of(new ExtractHeader()));
TableSchema customSchema = /* generate schema based on what I now know the headers are */
readIn.apply(ParDo.of(new ToTableRow())
  .apply(BigQueryIO.Write.named("Write")
    .to(tableRef)
    // Where logically should the following line go?
    .withSchema(customSchema));
p.run();

推荐答案

此功能(动态模式)正在接受审查 https://github.com/apache/beam/pull/2609 (我正在对其进行审核).您可以尝试进行中的PR,但是请注意,由于审核,其API可能会有所更改.提交PR时,我将更新此答案.

This feature (dynamic schemas) is in review right now https://github.com/apache/beam/pull/2609 (I'm reviewing it). You can give a try to the in-progress PR, however note that its API is likely to change somewhat as a result of the review. I'll update this answer when the PR is submitted.

这篇关于如何在BigQueryIO.Write之前读取和转换CSV标头?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆