如何为包装TableRow的类指定/定义编码器 [英] How can I specify/define a coder for a class that wraps TableRow
问题描述
我已经定义了一个包装 com.google.api.services.bigquery.model.TableRow
类的类来定义它作为内部成员。
public class TableRowWrapper implements Serializable {
private TableRow tableRow;
$ b $ public TableRowWrapper(){
}
...
}
我还有一些处理 TableRowWrapper
类的输入/输出实例的 DoFn
导致 PCollection< TableRowWrapper>
。我尝试使用 @DefaultCoder(SerializableCoder.class)
和 @DefaultCoder(ArvoCoder.class)
注释该类,但是它始终无法编码,因为它找不到 TableRow
的成员属性实例的编码器。
以下是使用 ArvoCoder
java的示例。 lang.IllegalArgumentException:无法使用编码器'AvroCoder'对元素'com.test.bigquery.api.TableRowWrapper@5129e8a6'进行编码。
at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:177)
at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder。 java:191)
,位于com.google.cloud.dataflow.sdk.util.WindowedValue $ FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:633)
,位于com.google.cloud.dataflow.sdk.util。 WindowedValue $ FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:542)
在com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory $ ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:429)
。在COM。 google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:115)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java :61)
,位于com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputR eceiver.java:46)
,位于com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase $ 1.output(ParDoFnBase.java:157)
位于com.google.cloud.dataflow.sdk .util.DoFnRunner $ DoFnContext.outputWindowedValue(DoFnRunner.java:329)
at com.google.cloud.dataflow.sdk.util.DoFnRunner $ DoFnProcessContext.output(DoFnRunner.java:483)
at com .test.cdf.wrapper.pipeline.DataflowPipeline $ TableRowToWrapperDoFn.processElement(DataflowPipeline.java:203)
导致:java.lang.NullPointerException:com.google.api中的com.test.bigquery.api.TableRowWrapper .services.bigquery.model.TableRow数组null null数组中的字段com中的数组com.google.api.services.bigquery.model.TableRow中的数组表com.test.bigquery.api.TableRowWrapper中的
中的数组。 apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
com.google.cloud。 dataflow.sdk.coders.AvroCoder.encode(AvroCode r.java:227)
,位于com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:174)
位于com.google.cloud.dataflow.sdk.coders。 StandardCoder.registerByteSizeObserver(StandardCoder.java:191)
,com.google.cloud.dataflow.sdk.util.WindowedValue $ FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:633)
,位于com.google.cloud。 dataflow.sdk.util.WindowedValue $ FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:542)
at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory $ ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:429)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:115)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter .update(DataflowOutputCounter.java:61)
,位于com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.pro cess(OutputReceiver.java:46)
,位于com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase $ 1.output(ParDoFnBase.java:157)
,位于com.google.cloud.dataflow .sdk.util.DoFnRunner $ DoFnContext.outputWindowedValue(DoFnRunner.java:329)
,位于com.google.cloud.dataflow.sdk.util.DoFnRunner $ DoFnProcessContext.output(DoFnRunner.java:483)
在com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189)上的com.test.cdf.wrapper.pipeline.DataflowPipeline $ TableRowToWrapperDoFn.processElement(DataflowPipeline.java:203)
。
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement( ParDoFnBase.java:193)
,位于com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52)
位于com.google.cloud.dataflow。 sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com。 google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:171)
,位于com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation .java:117)
,位于com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66)
,位于com.google.cloud.dataflow.sdk .runners.worker.DataflowWorker.executeWork(DataflowWorker.java:234)
com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:171)
at com .google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:137)
,位于com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness $ WorkerThread.call(DataflowWorkerHarness。 java:147)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness $ WorkerThread.call(DataflowWorkerHarness.java:132)
at java.util.concurrent.FutureTask.run( FutureTask.java:266)在java.util.concurrent.Th处
在线程式中,我们使用java.lang.Thread.run(ThreadPoolExecutor.java:617) 745)
导致:java.lang.NullPointerException
在org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:67)
在org.apache.avro.generic。 GenericDatumWriter.write(GenericDatumWriter.java:68)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField( GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java: 104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache。 avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect。 ReflectDatumWriter.write(ReflectDatumWriter.java:143)
... 31 more
如何可以我为这个类定义了一个编码器?
正如你所注意到的,自 为了编码可为空的值,Avro的自动模式生成需要一个通过。 6 / api / java / org / apache / avro / reflect / AvroSchema.htmlrel =nofollow> 为您的 您可以通过 I have defined a class that wraps I have also some How can I define a coder for this class? As you've noticed, since In order to encode nullable values, Avro’s automatic schema generation requires either an explicit union schema including null via Perhaps the easiest way to provide a coder for your You can register this coder for your entire pipeline via
这篇关于如何为包装TableRow的类指定/定义编码器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋! TableRow
不是 Serializable
,你将无法使用 SerializableCoder $
@AvroSchema
注释或 @Nullable
注释 - 特别是 org.apache.avro.reflect.Nullable
不是 javax.annotation.Nullable
。这些不在 TableRow
,所以 AvroCoder
也不适用。 b
TableRowWrapper
提供编码器的最简单方法是直接在 TableRowJsonCoder
:
class TableRowWrapperCoder扩展了CustomCoder< TableRowWrapper> {
private static final Coder< TableRow> tableRowCoder = TableRowJsonCoder.of();
@Override
public void encode(TableRowWrapper value,OutputStream outStream,Context context)
throws IOException {
tableRowCoder.encode(value.getRow(),outStream,上下文);
$ b @Override
public TableRowWrapper decode(InputStream inStream,Context context)
throws IOException {
return new TableRowWrapper(tableRowCoder.decode(inStream,上下文));
}
...
}
pipeline.getCoderRegistry()
.registerCoder(TableRowWrapper.class,新的TableRowWrapperCoder());
com.google.api.services.bigquery.model.TableRow
class defining it as a internal member public class TableRowWrapper implements Serializable {
private TableRow tableRow;
public TableRowWrapper() {
}
...
}
DoFn
that processes input/output instances of that TableRowWrapper
class resulting in a PCollection<TableRowWrapper>
. I've tried annotating that class with @DefaultCoder(SerializableCoder.class)
and @DefaultCoder(ArvoCoder.class)
but it always fails to code because it can't find a coder for the member attribute instance of TableRow
.
Here is an example when using ArvoCoder
java.lang.IllegalArgumentException: Unable to encode element 'com.test.bigquery.api.TableRowWrapper@5129e8a6' with coder 'AvroCoder'.
at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:177)
at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder.java:191)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:633)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:542)
at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:429)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:115)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:61)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:157)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:329)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:483)
at com.test.cdf.wrapper.pipeline.DataflowPipeline$TableRowToWrapperDoFn.processElement(DataflowPipeline.java:203)
Caused by: java.lang.NullPointerException: in com.test.bigquery.api.TableRowWrapper in com.google.api.services.bigquery.model.TableRow in array null of array in field f of com.google.api.services.bigquery.model.TableRow in field tableRow of com.test.bigquery.api.TableRowWrapper
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at com.google.cloud.dataflow.sdk.coders.AvroCoder.encode(AvroCoder.java:227)
at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:174)
at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder.java:191)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:633)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:542)
at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:429)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:115)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:61)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:157)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:329)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:483)
at com.test.cdf.wrapper.pipeline.DataflowPipeline$TableRowToWrapperDoFn.processElement(DataflowPipeline.java:203)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:193)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:171)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:117)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:234)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:171)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:137)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:147)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:67)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
... 31 more
TableRow
is not Serializable
, you won't be able to use SerializableCoder
.@AvroSchema
annotation or a @Nullable
annotation -- specifically org.apache.avro.reflect.Nullable
not javax.annotation.Nullable
. These are not present in TableRow
, so AvroCoder
is also inapplicable.TableRowWrapper
is to do so directly a via thin wrapper on TableRowJsonCoder
:class TableRowWrapperCoder extends CustomCoder<TableRowWrapper> {
private static final Coder<TableRow> tableRowCoder = TableRowJsonCoder.of();
@Override
public void encode(TableRowWrapper value, OutputStream outStream, Context context)
throws IOException {
tableRowCoder.encode(value.getRow(), outStream, context);
}
@Override
public TableRowWrapper decode(InputStream inStream, Context context)
throws IOException {
return new TableRowWrapper(tableRowCoder.decode(inStream, context));
}
...
}
pipeline.getCoderRegistry()
.registerCoder(TableRowWrapper.class, new TableRowWrapperCoder());