在数据流泛型中进行转换 [英] Making Transformations in Dataflow Generic

查看:78
本文介绍了在数据流泛型中进行转换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这与另一个SO问题有关[在这里](设置自定义编码器和处理参数化类型)在解决方法之后,帮助我在转换中使用自定义类型。但是因为我的自定义类型是通用的,所以我希望甚至可以使变换类成为泛型,然后可以使用相同类型对自定义类型进行参数化。但是当我尝试这样做时,我遇到了无法为类型变量T提供编码器,因为实际类型由于擦除而未知。解决方法建议注册一个编码器,它将返回类型参数,但由于类型参数本身是未知的,我猜这个异常被抛出,我不知道如何解决这个问题。

  static class Processor< T> 
扩展PTransform< PCollection< String>,
PCollection< KV< String,Set< CustomType< T>>>>> {

private static final long serialVersionUID = 0;

@Override public PCollection< KV< String,Set< CustomType< T>>>>
apply(PCollection< String> items){
PCollection< KV< String,Set< CustomType< T>>>> partitionedItems = items
.apply(ParDo.of(new ParDoFn()));
PCollection< KV< String,Set< CustomType< T>>>> combinedItems = partitionedItems
.apply(Combine。< String,Set< CustomType< T>>> perKey(new Merger()));


$ / code>


解决方案

看起来也是由 Github问题#57 造成的,应该与该问题一起解决。与此同时,Dataflow实际上包含了可以立即解决您的问题的高级功能。从你的代码片段看来,整个系统可能看起来像这样:

  class CustomType< T extends Serializable> {...} 

类Processor< T extends Serializable>
扩展PTransform< PCollection< String>,
PCollection< KV< String,Set< CustomType< T>>>>> {

class ParDoFn扩展DoFn< String,KV< String,Set< CustomType< T>>>> {...}

类合并扩展了BinaryCombineFn< Set< CustomType< T>>> {...}

@Override
public PCollection< KV< String,Set< CustomType< T>>>>
apply(PCollection< String> items){

PCollection< KV< String,Set< CustomType< T>>>> partitionedItems =
items.apply(ParDo.of(new ParDoFn()));

PCollection< KV< String,Set< CustomType< T>>>> combinedItems =
partitionedItems.apply(
)合并。< String,Set< CustomType< T>>,Set< CustomType< T>> perKey(
new Merger()) );

返回combinedItems;
}
}

...

PCollection< String> input = ...
input.apply(new Processor< String>());

Dataflow获取每个 DoFn 的输出类型,通过使用由 TypeDescriptor sdk / transforms / DoFn#getOutputTypeDescriptorrel =nofollow> getOutputTypeDescriptor

code> ParDoFn 是处理器< T> 的内部类,输出类型描述符简单地为 Set< CustomType< T>> ,甚至当它实例化为新的处理器< String>


$ b $为了获得类型信息,我们需要 ParDoFn 静态地知道为 T 提供的类型。这有两个步骤。

1。创建处理器 的一个匿名子类


$ b

  PCollection<字符串> input = ... 
input.apply(new Processor< String>(){});

这可确保对于处理器的此实例的所有内部类, code>,类型变量 T 静态绑定到类型 String 。在这种情况下,最好使处理器为抽象类,以便消费者需要继承它。



2.覆盖 getOutputTypeDescriptor of ParDoFn 来解析其类型与外部类处理器




$ b

  class Processor< T extends Serializable>扩展... {
class ParDoFn扩展DoFn< String,KV< String,Set< CustomType< T>>>> {
@Override
protected TypeDescriptor< KV< String,Set< CustomType< T>>>>
getOutputTypeDescriptor(){
返回新的TypeDescriptor< KV< String,Set< CustomType< T>>>>(
Processor.this.getClass()){};




$ b $ p $从一开始就是代码的完整工作版本然后是以下。再次注意,当 Github问题#57 得到解决时,这些都不是必需的。

  class CustomType< T extends Serializable> {...} 

抽象类Processor< T extends Serializable>
扩展PTransform< PCollection< String>,
PCollection< KV< String,Set< CustomType< T>>>>> {

class ParDoFn扩展DoFn< String,KV< String,Set< CustomType< T>>>> {
...

@Override
protected TypeDescriptor< KV< String,Set< CustomType< T>>>>
getOutputTypeDescriptor(){
返回新的TypeDescriptor< KV< String,Set< CustomType< T>>>>(
Processor.this.getClass()){};
}
}

类合并扩展BinaryCombineFn< Set< CustomType< T>>> {...}

@Override
public PCollection< KV< String,Set< CustomType< T>>>> apply(PCollection< String> items){

PCollection< KV< String,Set< CustomType< T>>>> partitionedItems =
items.apply(ParDo.of(new ParDoFn()));

PCollection< KV< String,Set< CustomType< T>>>> combinedItems =
partitionedItems.apply(
)合并。< String,Set< CustomType< T>>,Set< CustomType< T>> perKey(
new Merger()) );

返回combinedItems;
}
}

PCollection< String>输入= ...;
input.apply(new Processor< String>(){});

这不是唯一的解决方案 - 您也可以覆盖 Processor.getDefaultOutputCoder setCoder 关于中间 partitionedItems 集合 - 但它似乎是最通用的为此使用。


This is related to another SO question [here] (Setting Custom Coders & Handling Parameterized types) Following the work-arounds there helped me use custom-types in the transforms. But since my custom types are generic, I was hoping to make even the transform classes generic which then could parameterize the custom type with the same type. But when I try doing that, I run into Cannot provide a Coder for type variable T because the actual type is unknown due to erasure. The work around suggested registering a coder which would return the type parameter but since the type parameter is itself unknown, I guess this exception is thrown and I was not sure how to get around this.

static class Processor<T> 
  extends PTransform<PCollection<String>, 
                     PCollection<KV<String, Set<CustomType<T>>>>> { 

  private static final long serialVersionUID = 0; 

  @Override public PCollection<KV<String, Set<CustomType<T>>>>  
  apply(PCollection<String> items) {
    PCollection<KV<String, Set<CustomType<T>>>> partitionedItems = items     
        .apply(ParDo.of(new ParDoFn())); 
    PCollection<KV<String, Set<CustomType<T>>>> combinedItems = partitionedItems
        .apply(Combine.<String, Set<CustomType<T>>>perKey(new Merger()));
  }
} 

解决方案

This looks to be also caused by Github Issue #57 and should be fixed along with that issue.

In the meantime, Dataflow actually includes advanced features that can solve your problem immediately. It appears from your code snippet that the entire system in question may look something like this:

class CustomType<T extends Serializable> { ... }

class Processor<T extends Serializable>
    extends PTransform<PCollection<String>,
                       PCollection<KV<String, Set<CustomType<T>>>>> {

    class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> { … }

    class Merger extends BinaryCombineFn<Set<CustomType<T>>> { … }

    @Override
    public PCollection<KV<String, Set<CustomType<T>>>>
    apply(PCollection<String> items) {

      PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
          items.apply(ParDo.of(new ParDoFn()));

      PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
          partitionedItems.apply(
              Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
                  new Merger()));

      return combinedItems;
    }
}

…

PCollection<String> input = ...
input.apply(new Processor<String>());

Dataflow obtains the output type of each DoFn by using a TypeDescriptor returned by getOutputTypeDescriptor

Because your ParDoFn is an inner class of Processor<T>, the output type descriptor is simply Set<CustomType<T>>, even when it is instantiated as new Processor<String>.

To gain type information, we need ParDoFn to know, statically, the type provided for T. There are two steps for this.

1. Create an anonymous subclass of Processor

PCollection<String> input = ...
input.apply(new Processor<String>() {});

This ensures that for all the inner classes of this instance of Processor, the type variable T is statically bound to the type String. It is probably best in this case to make Processor an abstract class so consumers are required to subclass it.

2.Override getOutputTypeDescriptor of ParDoFn to resolve its types against the outer class Processor.

class Processor<T extends Serializable> extends ... {
  class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
    @Override
    protected TypeDescriptor<KV<String, Set<CustomType<T>>>>
    getOutputTypeDescriptor() {
      return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
        Processor.this.getClass()) {};
    }
 }

The complete working version of the code from the beginning is then the following. Note again that none of this will be necessary when Github Issue #57 is resolved.

class CustomType<T extends Serializable> { ... }

abstract class Processor<T extends Serializable>
    extends PTransform<PCollection<String>,
                       PCollection<KV<String, Set<CustomType<T>>>>> {

  class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
    ...

    @Override
    protected TypeDescriptor<KV<String, Set<CustomType<T>>>> 
    getOutputTypeDescriptor() {
      return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
          Processor.this.getClass()) {};
    }
  }

  class Merger extends BinaryCombineFn<Set<CustomType<T>>> { ... }

    @Override
    public PCollection<KV<String, Set<CustomType<T>>>> apply(PCollection<String> items) {

    PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
        items.apply(ParDo.of(new ParDoFn()));

    PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
        partitionedItems.apply(
          Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
              new Merger()));

    return combinedItems;
  }
}

PCollection<String> input = …;
input.apply(new Processor<String>() {});

This is not the only solution -- you could also override Processor.getDefaultOutputCoder or explicitly call setCoder on the intermediate partitionedItems collection -- but it seems to be the most general for this use.

这篇关于在数据流泛型中进行转换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆