将Avrocoder用于泛型的自定义类型 [英] Using Avrocoder for Custom Types with Generics

查看:246
本文介绍了将Avrocoder用于泛型的自定义类型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试使用AvroCoder来序列化在我的管道中传递给PCollections的自定义类型。自定义类型有一个通用的字段(当前是一个字符串)当我运行管道时,可能由于泛型字段,我得到如下所示的AvroTypeException。是否构建并传递AvroSchema作为解决此问题的唯一方法?

 线程main中的异常org.apache.avro.AvroTypeException:未知类型:T 
at org.apache .avro.specific.SpecificData.createSchema(SpecificData.java:255)
在org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:514)
在org.apache.avro.reflect .ReflectData.createFieldSchema(ReflectData.java:593)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:472)
at org.apache.avro.specific.SpecificData.getSchema (SpecificData.java:189)
com.google.cloud.dataflow.sdk.coders.AvroCoder.of(AvroCoder.java:116)

我也附上我的注册码以供参考。

  pipelineCoderRegistry.registerCoder (GenericTypeClass.class,new CoderFactory(){
@Override
public Coder<?> create(List<?extends Coder<>> componentCoders){
return AvroCoder.of (GenericTypeClass.class);
}

@Override
公开列表< Object> getInstanceComponents(Object value){
返回Collections.singletonList(((GenericTypeClass< Object>)value).key);
}
});


解决方案

完成正确的设置 CoderFactory ,但是Avro的 ReflectData 机制,其中 AvroCoder 用于在编写本文时,自动生成一个模式不适用于泛型类型。这被追踪为问题 AVRO-1571 。另请参阅此StackOverflow问题



为了允许 T GenericTypeClass< T> 进行编码$ c>,你是正确的,你将不得不提供一些显式的模式信息。有两种方法可以进行:第一种方法是在 T 类型的字段中提供一个显式模式您的 GenericTypeClass< T> ,如下所示:

  class GenericTypeClass< T> {
// Avro需要一个无参数构造函数
public GenericTypeClass(){}

@AvroSchema([\string \,\int\\ \\,...])
private T genericField;
}

缺点是它仅限于有限的静态联合模式,并且需要手动为JSON模式内联 T



更复杂的值。第二种方法是提供一个显式模式当您在 CoderFactory 中构建 AvroCoder ,并将此架构提供给 AvroCoder.of(Class,Schema) b

返回AvroCoder.of(
GenericTypeClass.class
schemaFromCoder(componentCoders.get(0)));
}

...
});

这将主要围绕转换编码器< T> 放入 T 的架构中。对于基本类型而言,这应该很容易,并且对于 ReflectData 支持的POJO可以管理。它还提供了一个特别支持更难的案例的钩子。


I am trying to use AvroCoder to serialise a custom type which is passed around in PCollections in my pipeline. The custom type has a generic field (which currently is a String) When I run the pipeline, I get the AvroTypeException like below probably due to the generic field. Is building and passing the AvroSchema for the object the only way to get around this?

Exception in thread "main" org.apache.avro.AvroTypeException: Unknown type: T
 at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:255)
 at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:514)
 at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:593)
 at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:472)
 at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
 at com.google.cloud.dataflow.sdk.coders.AvroCoder.of(AvroCoder.java:116)

I have also attached my registry code for reference.

pipelineCoderRegistry.registerCoder(GenericTypeClass.class, new CoderFactory() {
    @Override
    public Coder<?> create(List<? extends Coder<?>> componentCoders) {
        return AvroCoder.of(GenericTypeClass.class);
    }

    @Override
    public List<Object> getInstanceComponents(Object value) {
        return Collections.singletonList(((GenericTypeClass<Object>) value).key);
    }
});

解决方案

You’ve done everything right as far as setting up the CoderFactory, but Avro’s ReflectData mechanism which AvroCoder uses to automatically generate a schema does not work for generic types, as of this writing. This is tracked as issue AVRO-1571. See also this StackOverflow question.

In order to allow encoding of GenericTypeClass<T> for some particular values of T, you are correct that you will have to provide some explicit schema information. There are two ways to proceed:

The first approach is to provide an explicit schema on fields of type T within your GenericTypeClass<T>, like so:

class GenericTypeClass<T> {
  // Avro requires a no-args constructor
  public GenericTypeClass() {}

  @AvroSchema("[\"string\", \"int\", ...]")
  private T genericField;
}

The drawback is that it is limited to a finite, static union schema, and requires manually inlining the JSON schema for more complex values of T.

The second approach is to provide an explicit schema when you build an AvroCoder in your CoderFactory, and provide this schema to AvroCoder.of(Class, Schema).

pipelineCoderRegistry.registerCoder(GenericTypeClass.class, new CoderFactory() {
  @Override
  public Coder<?> create(List<? extends Coder<?>> componentCoders) {
      return AvroCoder.of(
          GenericTypeClass.class
          schemaFromCoder(componentCoders.get(0)));
  }

  ...
});

This will mostly revolve around converting a Coder<T> into a schema for T. This should be easy for basic types and manageable for POJOs that ReflectData supports. It does also provide a hook for ad hoc support of more difficult cases.

这篇关于将Avrocoder用于泛型的自定义类型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆