使用Java API/数据流将重复的记录插入到Big Query中-“重复的字段必须作为JSON数组导入". [英] Inserting repeated records into Big Query with Java API/Dataflow - "Repeated field must be imported as a JSON array"

查看:64
本文介绍了使用Java API/数据流将重复的记录插入到Big Query中-“重复的字段必须作为JSON数组导入".的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有重复键值(字符串,字符串)记录对的数据作为Big Query表模式中的字段之一.

I have data with repeated key-value (String,String) record pairs as one of the fields in a Big Query table schema.

我正在尝试使用此处的方法添加这些重复的记录: http://sookocheff.com/post/bigquery/creating-a-big-query-table-java-api/

I am trying to add these repeated records using the approach here: http://sookocheff.com/post/bigquery/creating-a-big-query-table-java-api/

为重复记录字段创建的表模式如下:

The table schema created for the repeated record field looks like this:

TableFieldSchema column = new TableFieldSchema().setName("rawFields");
column.setType("RECORD");
List<TableFieldSchema> list = new ArrayList<>();
list.add(new TableFieldSchema().setName("key").setType("STRING"));
list.add(new TableFieldSchema().setName("value").setType("STRING"));
column.setFields(list);
column.setMode("REPEATED");

我正在将这样的数据作为DoFn的一部分:

I am inserting data like this as part of a DoFn:

Map<String,String> record = ... // key-value pairs
List<TableRow> rawFields = new ArrayList<>();
record.forEach((k,v)->
    rawFields.add(new TableRow().set("key",k).set("value", v))
);
TableRow row = new TableRow();
// row has other fields, omitted here
row.set("rawFields", rawFields);

DoFn在我的数据流管道中,就在BigQueryIO.Write之前:

The DoFn is in my dataflow pipeline just before the BigQueryIO.Write:

.apply(BigQueryIO.Write
        .named("WriteLBLogLines")
        .to("xxx:yyy.zzz")
        .withSchema(mySchema)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

当我尝试通过Dataflow运行它时,出现以下错误:

When I try and run this through Dataflow I get the following error:

errorResult: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON parsing error in row starting at position 0 at file: gs://xxxxxxxxxx/12271006010671464167/dax-tmp-2016-06-28_14_47_26-12271006010671462904-S04-1-303c4f638f6b411b/-shard-00002-of-00003-try-021aff4c448b3177-endshard.json. Repeated field must be imported as a JSON array. Field: rawFields.

我的方法有什么问题?看来我没有使用正确的方法插入重复的记录.

What is wrong with my approach? It seems I am not using the right approach for inserting repeated records.

推荐答案

我尝试使用以下代码重现该问题,但它成功执行.架构中是否还有其他方面可能存在争议?

I've attempted to reproduce the problem with the following code, but it executes successfully. Are there other aspects of the schema that could be at issue?

List<TableFieldSchema> fields = new ArrayList<>();
TableFieldSchema column = new TableFieldSchema().setName("rawFields");
column.setType("RECORD");
List<TableFieldSchema> list = new ArrayList<>();
list.add(new TableFieldSchema().setName("key").setType("STRING"));
list.add(new TableFieldSchema().setName("value").setType("STRING"));
column.setFields(list);
column.setMode("REPEATED");
fields.add(column);
TableSchema schema = new TableSchema().setFields(fields);

TableRow row = new TableRow();
List<TableRow> rawFields = new ArrayList<>();
rawFields.add(new TableRow().set("key","foo").set("value", "bar"));
row.set("rawFields", rawFields);

Pipeline p = Pipeline.create(options);
PCollection<TableRow> c =
    p.apply(Create.of(row, row).withCoder(TableRowJsonCoder.of()));
c.apply(BigQueryIO.Write.named("BigQuery-Write")
        .to(options.getOutput())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
        .withSchema(schema));
p.run();

这篇关于使用Java API/数据流将重复的记录插入到Big Query中-“重复的字段必须作为JSON数组导入".的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆