数据被写入BigQuery,但格式不正确 [英] Data is written to BigQuery but not in proper format

查看:206
本文介绍了数据被写入BigQuery,但格式不正确的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在向BigQuery写入数据并成功写入。但我关心的是它的写作格式。



以下是在BigQuery中执行任何查询时显示数据的格式: href =https://i.stack.imgur.com/Lek2L.png =nofollow noreferrer>



检查第一行,SalesComponent的值是CPS_H,但它显示'BeamRecord [dataValues = [CPS_H'并且在ModelIteration中值以方括号结束。



以下是用于将数据从BeamSql推送到BigQuery的代码:



我们可以使用setSchema()。setFields(ImmutableList.of(
)new TableFieldSchema()。setName(SalesComponent)。setType (STRING)。setMode(REQUIRED),
新的TableFieldSchema()。setName(DuetoValue).setType(STRING).setMode(REQUIRED),
New TableFieldSchema ).setName(ModelIteration).setType(STRING).setMode(REQUIRED)
));

TableReference tableSpec = BigQueryHelpers.parseTableSpec(beta-194409:data_id1.tables_test);
System.out.println(Start Bigquery);
final_out.apply(MapElements.into(TypeDescriptor.of(TableRow.class))。via(
(MyOutputClass elem) - > new TableRow().set(SalesComponent,elem.SalesComponent) .set(DuetoValue,elem.DuetoValue).set(ModelIteration,elem.ModelIteration)))
.apply(BigQueryIO.writeTableRows()
.to(tableSpec)
。 withSchema(tableSchema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

p.run()。waitUntilFinish();

编辑

我已经使用下面的代码将BeamRecord转换为MyOutputClass类型,这也不起作用:

  PCollection< MyOutputClass> final_out = join_query.apply(ParDo.of(new DoFn< BeamRecord,MyOutputClass>(){
private static final long serialVersionUID = 1L;
@ProcessElement $ b $ public void processElement(ProcessContext c){
BeamRecord record = c.element();
String [] strArr = record.toString()。split(,);
MyOutputClass moc = new MyOutputClass();
moc.setSalesComponent(strArr [0]);
moc.setDuetoValue(strArr [1]);
moc.setModelIteration(strArr [2]);
c.output(moc);
}
}));


解决方案

它看起来像你的 MyOutputClass 构造不正确(值不正确)。如果你看看它, BigQueryIO 能够创建正确字段的行就好了。但是那些领域有错误的价值观。这意味着当您调用 .set(SalesComponent,elem.SalesComponent)时,您在 elem 。

我的猜测是问题出在前一步,当你从 BeamRecord 转换为 MyOutputClass 。如果你做了这样的事情(或者其他一些转换逻辑为你在幕后做了这些事情),你会得到类似于你所看到的结果:


  • 通过调用 beamRecord.toString();将 BeamRecord 转换为字符串。


    • 如果您查看 BeamRecord.toString()实现,您可以看到,该字符串格式;


  • 将字符串拆分为获取字符串数组;
  • 构造 MyOutputClass ;


这个伪代码就像这样:

  PCollection< MyOutputClass> final_out = 
beamRecords
.apply(
ParDo.of(new DoFn(){

@ProcessElement
void processElement(Context c){
BeamRecord record = c.elem();
String [] fields = record.toString()。split(,);
MyOutputClass elem = new MyOutputClass();
elem .SalesComponent = fields [0];
elem.DuetoValue = fields [1];
...
c.output(elem);
}
})
);

正确的做法是在记录中调用getters而不是拆分字符串表示形式, (伪代码):

  PCollection< MyOutputClass> final_out = 
beamRecords
.apply(
ParDo.of(new DoFn(){

@ProcessElement
void processElement(Context c){
BeamRecord record = c.elem();
MyOutputClass elem = new MyOutputClass();

//通过名称获取字段值
elem.SalesComponent = record.getString( CPS_H ...);

//按名称获取另一个字段值
elem.DuetoValue = record.getInteger(...);
...
c.output(elem);
}
})
);

您可以通过添加一个简单的 ParDo 您可以在其中放置断点并查看调试器中的元素,或将元素输出到其他位置(例如控制台)。


I'm writing data to BigQuery and successfully gets written there. But I'm concerned with the format in which it is getting written.

Below is the format in which the data is shown when I execute any query in BigQuery :

Check the first row, the value of SalesComponent is CPS_H but its showing 'BeamRecord [dataValues=[CPS_H' and In the ModelIteration the value is ended with a square braket.

Below is the code that is used to push data to BigQuery from BeamSql:

TableSchema tableSchema = new TableSchema().setFields(ImmutableList.of(
    new TableFieldSchema().setName("SalesComponent").setType("STRING").setMode("REQUIRED"),
    new TableFieldSchema().setName("DuetoValue").setType("STRING").setMode("REQUIRED"),
    new TableFieldSchema().setName("ModelIteration").setType("STRING").setMode("REQUIRED")
));

TableReference tableSpec = BigQueryHelpers.parseTableSpec("beta-194409:data_id1.tables_test");
System.out.println("Start Bigquery");
final_out.apply(MapElements.into(TypeDescriptor.of(TableRow.class)).via(
    (MyOutputClass elem) -> new TableRow().set("SalesComponent", elem.SalesComponent).set("DuetoValue", elem.DuetoValue).set("ModelIteration", elem.ModelIteration)))
        .apply(BigQueryIO.writeTableRows()
        .to(tableSpec)
        .withSchema(tableSchema)
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

p.run().waitUntilFinish();

EDIT

I have transformed BeamRecord into MyOutputClass type using below code and this also doesn't work:

 PCollection<MyOutputClass> final_out = join_query.apply(ParDo.of(new DoFn<BeamRecord, MyOutputClass>() {
        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {
             BeamRecord record = c.element();
               String[] strArr = record.toString().split(",");
            MyOutputClass moc = new MyOutputClass();
            moc.setSalesComponent(strArr[0]);
            moc.setDuetoValue(strArr[1]);
            moc.setModelIteration(strArr[2]);
            c.output(moc);
        }
    }));

解决方案

It looks like your MyOutputClass is constructed incorrectly (with incorrect values). If you look at it, BigQueryIO is able to create rows with correct fields just fine. But those fields have wrong values. Which means that when you call .set("SalesComponent", elem.SalesComponent) you already have incorrect data in the elem.

My guess is the problem is in some previous step, when you convert from BeamRecord to MyOutputClass. You would get a result similar to what you're seeing if you did something like this (or some other conversion logic did this for you behind the scenes):

  • convert BeamRecord to string by calling beamRecord.toString();
    • if you look at BeamRecord.toString() implementation you can see that you're getting exactly that string format;
  • split this string by , getting an array of strings;
  • construct MyOutputClass from that array;

Pseudocode for this is something like:

PCollection<MyOutputClass> final_out = 
  beamRecords
    .apply(
      ParDo.of(new DoFn() {

        @ProcessElement
        void processElement(Context c) {
           BeamRecord record = c.elem();
           String[] fields = record.toString().split(",");
           MyOutputClass elem = new MyOutputClass();
           elem.SalesComponent = fields[0];
           elem.DuetoValue = fields[1];
           ...
           c.output(elem);
        }
      })
    );

Correct way of doing something like this is to call getters on the record instead of splitting its string representation, along these lines (pseudocode):

PCollection<MyOutputClass> final_out = 
      beamRecords
        .apply(
          ParDo.of(new DoFn() {

            @ProcessElement
            void processElement(Context c) {
               BeamRecord record = c.elem();
               MyOutputClass elem = new MyOutputClass();

               //get field value by name
               elem.SalesComponent = record.getString("CPS_H..."); 

               // get another field value by name
               elem.DuetoValue = record.getInteger("...");
               ...
               c.output(elem);
            }
          })
        );

You can verify something like this by adding a simple ParDo where you either put a breakpoint and look at the elements in the debugger, or output the elements somewhere else (e.g. console).

这篇关于数据被写入BigQuery,但格式不正确的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆