将CSV文件从GCS导入到BigQuery [英] Import CSV file from GCS to BigQuery

查看:182
本文介绍了将CSV文件从GCS导入到BigQuery的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试弄清楚如何将GCS的CSV文件加载到BigQuery中.下面的管道:

I'm trying to figure out how to load a CSV file from GCS into BigQuery. Pipeline below:

    // Create the pipeline
    Pipeline p = Pipeline.create(options);

    // Create the PCollection from csv
    PCollection<String> lines = p.apply(TextIO.read().from("gs://impression_tst_data/incoming_data.csv"));


    // Transform into TableRow
    PCollection<TableRow> row = lines.apply(ParDo.of(new StringToRowConverter()));


    // Write table to BigQuery
    row.apply(BigQueryIO.<TableRow>writeTableRows()
            .to("project_id:dataset.table")
            .withSchema(getSchema())
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));

这是我在ParDo中用来创建TableRow PCollection的StringToRowConverter类:

Here is the StringToRowConverter class I'm using in the ParDo to create a TableRow PCollection:

// StringToRowConverter
static class StringToRowConverter extends DoFn<String, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        c.output(new TableRow().set("string_field", c.element()));
    }
}

查看暂存文件,它看起来像这样,创建了JSON的TableRows,将CSV打包到名为"string_field"的单个列中.如果我未在架构中定义string_field,则作业将失败.当我定义string_field时,它将CSV的每一行写入该列,并将模式中定义的所有其他列留空.我知道这是预期的行为.

Looking at the staging files it looks like this creates TableRows of JSON that lump the csv into a single column named "string_field". If I don't define string_field in my schema the job fails. When I do define string_field, it writes the each row of the CSV into the column and leaves all my other columns defined in the schema empty. I know this is expected behavior.

所以我的问题是:如何获取此JSON输出并将其写入架构?下面的示例输出和模式...

So my question: How do I take this JSON output and write it into the schema? Sample output and schema below...

"string_field": "6/26/17 21:28,Dave Smith,1 Learning Drive,867-5309,etc"}

模式:

static TableSchema getSchema() {
            return new TableSchema().setFields(new ArrayList<TableFieldSchema>() {
                // Compose the list of TableFieldSchema from tableSchema.
                {
                    add(new TableFieldSchema().setName("Event_Time").setType("TIMESTAMP"));
                    add(new TableFieldSchema().setName("Name").setType("STRING"));
                    add(new TableFieldSchema().setName("Address").setType("STRING"));
                    add(new TableFieldSchema().setName("Phone").setType("STRING"));
                    add(new TableFieldSchema().setName("etc").setType("STRING"));
                }
            });
        }

是否有比使用StringToRowConverter更好的方法?

Is there a better way of doing this than using the StringToRowConverter?

我需要先使用ParDo来创建TableRow PCollection,然后才能将其写出到BQ.但是,我找不到如何获取CSV PCollection,转换为TableRow并将其写出的可靠示例.

I need to use a ParDo to create a TableRow PCollection before I can write it out to BQ. However, I'm unable to find a solid example of how to take in a CSV PCollection, transform to TableRow and write it out.

是的,我是一个菜鸟,想在这里学习.我希望有人可以帮助我提供摘要,或者以最简单的方法将正确的方向指向我.预先感谢.

Yes, I am a noob trying to learn here. I'm hoping somebody can help me with a snippet or point me in the right direction on the easiest way to accomplish this. Thanks in advance.

推荐答案

StringToRowConverter DoFn中的代码应解析该字符串并生成具有多个字段的TableRow.由于每一行都以逗号分隔,因此可能需要在逗号上分割字符串,然后使用您对列顺序的了解来执行类似的操作:

The code in your StringToRowConverter DoFn should parse the string and produce a TableRow with multiple fields. Since each row is comma separated, this would likely involve splitting the string on commas, and then using your knowledge of the column order to do something like:

String inputLine = c.element();

// May need to make the line parsing more robust, depending on your
// files. Look at how to parse rows of a CSV using Java.
String[] split = inputLine.split(',');

// Also, you may need to handle errors such as not enough columns, etc.

TableRow output = new TableRow();
output.set("Event_Time", split[0]); // may want to parse the string
output.set("Name", split[1]);
...
c.output(output);

这篇关于将CSV文件从GCS导入到BigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆