Dataflow中的BigQuery无法从云存储中加载数据:为非记录字段指定的JSON对象 [英] BigQuery in Dataflow fails to load data from Cloud Storage: JSON object specified for non-record field

查看:140
本文介绍了Dataflow中的BigQuery无法从云存储中加载数据:为非记录字段指定的JSON对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的机器上本地运行一个Dataflow管道,以写入BigQuery。此批处理作业中的BigQuery需要临时位置。我在我的云存储中提供了一个。相关部分是:

  PipelineOptions选项= PipelineOptionsFactory.create(); 
options.as(BigQueryOptions.class)
.setTempLocation(gs:// folder / temp);
管道p = Pipeline.create(选项);

....

List< TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema()。setName(uuid)。setType(STRING));
fields.add(new TableFieldSchema()。setName(start_time)。setType(TIMESTAMP));
fields.add(new TableFieldSchema()。setName(end_time)。setType(TIMESTAMP));
TableSchema schema = new TableSchema()。setFields(fields);

session_windowed_items.apply(ParDo.of(new FormatAsTableRowFn()))
.apply(BigQueryIO.Write
.withSchema(schema)
.withCreateDisposition(BigQueryIO。 Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.to(myproject:db.table));

其中 FormatAsTableRowFn 我有:

 静态类FormatAsTableRowFn扩展DoFn< KV< String,String>,TableRow> 
implements RequiresWindowAccess {
@Override
public void processElement(ProcessContext c){
TableRow row = new TableRow()
.set(uuid,c.element ().getKey())
//包含窗口时间戳字段
.set(start_time,((IntervalWindow)c.window())。start()。toInstant()) //注意:我尝试了使用和不使用
.set(end_time,((IntervalWindow)c.window())。end()。toInstant()); // .toInstant接收到相同的错误
c.output(row);




$ b $如果我打印出行.toString()我会得到合法的时间戳:

  {uuid = 00:00: 00:00:00,start_time = 2016-09-22T07:34:38.000Z,end_time = 2016-09-22T07:39:38.000Z} 

运行此代码时,JAVA会说:创建加载作业失败beam_job_XXX



手动检查GCS中的 temp 文件夹,这些对象看起来像:

  { 陆委会: 00:00:00:00:00:00, START_TIME:{ 米利斯:1474529678000, 年表:{ 区:{ 固定:真实, ID: UTC}, 区:{ 固定:真实的, ID: UTC}, afterNow:假的, beforeNow:真实的, equalNow:假}, END_TIME :{ 米利斯 :1474529978000, 年表 :{ 区 :{ 固定 :真实的, ID : UTC }, 区 :{ 固定 :真正的 身份证 :UTC},afterNow:false,beforeNow:true,equalNow:false}} 

查看BigQuery中失败的作业报告,Error说:

JSON obj ect指定为非记录字段:start_time(错误代码:无效)



这很奇怪,因为我很确定我说过这个是一个TIMESTAMP,我100%确定BigQuery中的模式符合SDK中的 TableSchema 。 (注意:设置 withCreateDisposition ... CREATE_IF_NEEDED 得到相同的结果)

有人可以告诉我我怎样需要对此进行补救以获取BigQuery中的数据吗?

解决方案

不要使用即时对象。尝试使用毫秒/秒。



https ://cloud.google.com/bigquery/data-types


正数指定

所以,像这样的东西应该可以工作:



.getMillis()/ 1000


I have a Dataflow pipeline running locally on my machine writing to BigQuery. BigQuery in this batch job, requires a temporary location. I have provided one in my Cloud Storage. The relevant parts are:

PipelineOptions options = PipelineOptionsFactory.create();
    options.as(BigQueryOptions.class)
            .setTempLocation("gs://folder/temp");
    Pipeline p = Pipeline.create(options);

....

List<TableFieldSchema> fields = new ArrayList<>();
      fields.add(new TableFieldSchema().setName("uuid").setType("STRING"));
      fields.add(new TableFieldSchema().setName("start_time").setType("TIMESTAMP"));
      fields.add(new TableFieldSchema().setName("end_time").setType("TIMESTAMP"));
      TableSchema schema = new TableSchema().setFields(fields);

session_windowed_items.apply(ParDo.of(new FormatAsTableRowFn()))
      .apply(BigQueryIO.Write
      .withSchema(schema)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
      .to("myproject:db.table"));

Where for FormatAsTableRowFn I have:

static class FormatAsTableRowFn extends DoFn<KV<String, String>, TableRow>
implements RequiresWindowAccess{  
    @Override
        public void processElement(ProcessContext c) {
          TableRow row = new TableRow()
              .set("uuid", c.element().getKey())
              // include a field for the window timestamp
             .set("start_time", ((IntervalWindow) c.window()).start().toInstant()) //NOTE: I tried both with and without 
             .set("end_time", ((IntervalWindow) c.window()).end().toInstant());   // .toInstant receiving the same error
          c.output(row);
        }
      }

If I print out row.toString() I will get legit timestamps:

{uuid=00:00:00:00:00:00, start_time=2016-09-22T07:34:38.000Z, end_time=2016-09-22T07:39:38.000Z}

When I run this code JAVA says: Failed to create the load job beam_job_XXX

Manually inspecting the temp folder in GCS, the objects look like:

{"mac":"00:00:00:00:00:00","start_time":{"millis":1474529678000,"chronology":{"zone":{"fixed":true,"id":"UTC"}},"zone":{"fixed":true,"id":"UTC"},"afterNow":false,"beforeNow":true,"equalNow":false},"end_time":{"millis":1474529978000,"chronology":{"zone":{"fixed":true,"id":"UTC"}},"zone":{"fixed":true,"id":"UTC"},"afterNow":false,"beforeNow":true,"equalNow":false}}

Looking at the failed job report in BigQuery, the Error says:

JSON object specified for non-record field: start_time (error code: invalid)

This is very strange, because I am pretty sure I said this is a TIMESTAMP, and I am 100% sure my schema in BigQuery conforms with the TableSchema in the SDK. (NOTE: setting the withCreateDisposition...CREATE_IF_NEEDEDyields the same result)

Could someone please tell me how I need to remedy this to get the data inside BigQuery?

解决方案

Don't use Instant objects. Try using milliseconds/seconds.

https://cloud.google.com/bigquery/data-types

A positive number specifies the number of seconds since the epoch

So, something like this should work:

.getMillis() / 1000

这篇关于Dataflow中的BigQuery无法从云存储中加载数据:为非记录字段指定的JSON对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆