如何为包装TableRow的类指定/定义编码器 [英] How can I specify/define a coder for a class that wraps TableRow

查看:185
本文介绍了如何为包装TableRow的类指定/定义编码器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经定义了一个包装 com.google.api.services.bigquery.model.TableRow 类的类来定义它作为内部成员。

  public class TableRowWrapper implements Serializable {

private TableRow tableRow;
$ b $ public TableRowWrapper(){
}
...
}

我还有一些处理 TableRowWrapper 类的输入/输出实例的 DoFn 导致 PCollection< TableRowWrapper> 。我尝试使用 @DefaultCoder(SerializableCoder.class) @DefaultCoder(ArvoCoder.class)注释该类,但是它始终无法编码,因为它找不到 TableRow 的成员属性实例的编码器。
以下是使用 ArvoCoder

  java的示例。 lang.IllegalArgumentException:无法使用编码器'AvroCoder'对元素'com.test.bigquery.api.TableRowWrapper@5129e8a6'进行编码。 
at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:177)
at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder。 java:191)
,位于com.google.cloud.dataflow.sdk.util.WindowedValue $ FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:633)
,位于com.google.cloud.dataflow.sdk.util。 WindowedValue $ FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:542)
在com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory $ ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:429)
。在COM。 google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:115)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java :61)
,位于com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputR eceiver.java:46)
,位于com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase $ 1.output(ParDoFnBase.java:157)
位于com.google.cloud.dataflow.sdk .util.DoFnRunner $ DoFnContext.outputWindowedValue(DoFnRunner.java:329)
at com.google.cloud.dataflow.sdk.util.DoFnRunner $ DoFnProcessContext.output(DoFnRunner.java:483)
at com .test.cdf.wrapper.pipeline.DataflowPipeline $ TableRowToWrapperDoFn.processElement(DataflowPipeline.java:203)
导致:java.lang.NullPointerException:com.google.api中的com.test.bigquery.api.TableRowWrapper .services.bigquery.model.TableRow数组null null数组中的字段com中的数组com.google.api.services.bigquery.model.TableRow中的数组表com.test.bigquery.api.TableRowWrapper中的
中的数组。 apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
com.google.cloud。 dataflow.sdk.coders.AvroCoder.encode(AvroCode r.java:227)
,位于com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:174)
位于com.google.cloud.dataflow.sdk.coders。 StandardCoder.registerByteSizeObserver(StandardCoder.java:191)
,com.google.cloud.dataflow.sdk.util.WindowedValue $ FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:633)
,位于com.google.cloud。 dataflow.sdk.util.WindowedValue $ FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:542)
at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory $ ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:429)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:115)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter .update(DataflowOutputCounter.java:61)
,位于com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.pro cess(OutputReceiver.java:46)
,位于com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase $ 1.output(ParDoFnBase.java:157)
,位于com.google.cloud.dataflow .sdk.util.DoFnRunner $ DoFnContext.outputWindowedValue(DoFnRunner.java:329)
,位于com.google.cloud.dataflow.sdk.util.DoFnRunner $ DoFnProcessContext.output(DoFnRunner.java:483)
在com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189)上的com.test.cdf.wrapper.pipeline.DataflowPipeline $ TableRowToWrapperDoFn.processElement(DataflowPipeline.java:203)

at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement( ParDoFnBase.java:193)
,位于com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52)
位于com.google.cloud.dataflow。 sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com。 google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:171)
,位于com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation .java:117)
,位于com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66)
,位于com.google.cloud.dataflow.sdk .runners.worker.DataflowWorker.executeWork(DataflowWorker.java:234)
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:171)
at com .google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:137)
,位于com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness $ WorkerThread.call(DataflowWorkerHarness。 java:147)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness $ WorkerThread.call(DataflowWorkerHarness.java:132)
at java.util.concurrent.FutureTask.run( FutureTask.java:266)在java.util.concurrent.Th处
在线程式中,我们使用java.lang.Thread.run(ThreadPoolExecutor.java:617) 745)
导致:java.lang.NullPointerException
在org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:67)
在org.apache.avro.generic。 GenericDatumWriter.write(GenericDatumWriter.java:68)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField( GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java: 104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache。 avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect。 ReflectDatumWriter.write(ReflectDatumWriter.java:143)
... 31 more

如何可以我为这个类定义了一个编码器?

解决方案

正如你所注意到的,自 TableRow 不是 Serializable ,你将无法使用 SerializableCoder

为了编码可为空的值,Avro的自动模式生成需要一个通过。 6 / api / java / org / apache / avro / reflect / AvroSchema.htmlrel =nofollow> @AvroSchema 注释或 @Nullable 注释 - 特别是 org.apache.avro.reflect.Nullable 不是 javax.annotation.Nullable 。这些不在 TableRow ,所以 AvroCoder 也不适用。 b

为您的 TableRowWrapper 提供编码器的最简单方法是直接在 TableRowJsonCoder

  class TableRowWrapperCoder扩展了CustomCoder< TableRowWrapper> {

private static final Coder< TableRow> tableRowCoder = TableRowJsonCoder.of();

@Override
public void encode(TableRowWrapper value,OutputStream outStream,Context context)
throws IOException {
tableRowCoder.encode(value.getRow(),outStream,上下文);

$ b @Override
public TableRowWrapper decode(InputStream inStream,Context context)
throws IOException {
return new TableRowWrapper(tableRowCoder.decode(inStream,上下文));
}

...
}

您可以通过

  pipeline.getCoderRegistry()
.registerCoder(TableRowWrapper.class,新的TableRowWrapperCoder());


I have defined a class that wraps com.google.api.services.bigquery.model.TableRow class defining it as a internal member

public class TableRowWrapper implements Serializable {

    private TableRow tableRow;

    public TableRowWrapper() {
    } 
...
}

I have also some DoFn that processes input/output instances of that TableRowWrapper class resulting in a PCollection<TableRowWrapper>. I've tried annotating that class with @DefaultCoder(SerializableCoder.class) and @DefaultCoder(ArvoCoder.class) but it always fails to code because it can't find a coder for the member attribute instance of TableRow. Here is an example when using ArvoCoder

 java.lang.IllegalArgumentException: Unable to encode element 'com.test.bigquery.api.TableRowWrapper@5129e8a6' with coder 'AvroCoder'.
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:177)
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder.java:191)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:633)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:542)
    at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:429)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:115)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:61)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:157)
    at      com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:329)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:483)
    at   com.test.cdf.wrapper.pipeline.DataflowPipeline$TableRowToWrapperDoFn.processElement(DataflowPipeline.java:203)
Caused by: java.lang.NullPointerException: in com.test.bigquery.api.TableRowWrapper in com.google.api.services.bigquery.model.TableRow in array null of array in field f of com.google.api.services.bigquery.model.TableRow in field tableRow of com.test.bigquery.api.TableRowWrapper
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
    at com.google.cloud.dataflow.sdk.coders.AvroCoder.encode(AvroCoder.java:227)
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:174)
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder.java:191)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:633)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:542)
    at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:429)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:115)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:61)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:157)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:329)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:483)
    at com.test.cdf.wrapper.pipeline.DataflowPipeline$TableRowToWrapperDoFn.processElement(DataflowPipeline.java:203)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171)
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:193)
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:171)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:117)
    at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:234)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:171)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:137)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:147)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.NullPointerException
    at org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:67)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
    at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
    at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
    ... 31 more

How can I define a coder for this class?

解决方案

As you've noticed, since TableRow is not Serializable, you won't be able to use SerializableCoder.

In order to encode nullable values, Avro’s automatic schema generation requires either an explicit union schema including null via @AvroSchema annotation or a @Nullable annotation -- specifically org.apache.avro.reflect.Nullable not javax.annotation.Nullable. These are not present in TableRow, so AvroCoder is also inapplicable.

Perhaps the easiest way to provide a coder for your TableRowWrapper is to do so directly a via thin wrapper on TableRowJsonCoder:

class TableRowWrapperCoder extends CustomCoder<TableRowWrapper> {

  private static final Coder<TableRow> tableRowCoder = TableRowJsonCoder.of();

  @Override
  public void encode(TableRowWrapper value, OutputStream outStream, Context context)
      throws IOException {
    tableRowCoder.encode(value.getRow(), outStream, context);
  }

  @Override
  public TableRowWrapper decode(InputStream inStream, Context context)
      throws IOException {
    return new TableRowWrapper(tableRowCoder.decode(inStream, context));
  }

  ...
}

You can register this coder for your entire pipeline via

pipeline.getCoderRegistry()
    .registerCoder(TableRowWrapper.class, new TableRowWrapperCoder());

这篇关于如何为包装TableRow的类指定/定义编码器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆