flink - 使用匕首注入 - 不可序列化? [英] flink - using dagger injections - not serializable?

查看:41
本文介绍了flink - 使用匕首注入 - 不可序列化?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 Flink(最新通过 git)从 kafka 流到 cassandra.为了简化单元测试,我通过 Dagger 添加依赖项注入.

Im using Flink (latest via git) to stream from kafka to cassandra. To ease unit testing Im adding dependency injection via Dagger.

ObjectGraph 似乎自我设置正确,但 Flink 将内部对象"标记为不可序列化".如果我直接包含这些对象,它们会起作用 - 那么有什么区别?

The ObjectGraph seems to be setting itself up properly but the 'inner objects' are being flagged as 'not serializable' by Flink. If I include these objects directly they work - so what's the difference?

有问题的类实现了 MapFunction@Inject 一个用于 cassandra 的模块和一个用于读取配置文件的模块.

Class in question implements MapFunction and @Inject a module for cassandra and one for reading config files.

有没有办法构建它以便我可以使用后期绑定,或者 Flink 使这成为不可能?

Is there a way to build this so I can use late binding or does Flink make this impossible?

fwiw - 依赖注入(通过 dagger)和 RichMapFunction 不能共存.Dagger 不允许您在其定义中包含任何具有 extends 的对象.

fwiw - Dependency injection (via dagger) and RichMapFunction can't coexist. Dagger won't let you include any objects that have extends in their definition.

通过 Dagger Lazy 实例化的对象也不会序列化.

Objects instantiated via Dagger Lazy<t> won't serialize either.

线程主"org.apache.flink.api.common.InvalidProgramException 中的异常:对象 com.someapp.SaveMap@2e029d61 不可序列化
...
引起:java.io.NotSerializableException:dagger.internal.LazyBinding$1

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.someapp.SaveMap@2e029d61 not serializable
...
Caused by: java.io.NotSerializableException: dagger.internal.LazyBinding$1

推荐答案

在深入研究问题的细节之前,先了解一下 Apache Flink 中函数可序列化的背景:

Before diving into the specifics of the question, a bit of background on serializability of functions in Apache Flink:

Apache Flink 使用 Java 序列化 (java.io.Serializable) 将函数对象(此处为 MapFunction)传送给并行执行它们的工作程序.因此,函数需要可序列化:该函数可能不包含任何不可序列化的字段,即非原始类型(int、long、double 等)且未实现 java.io.Serializable.

Apache Flink uses Java Serialization (java.io.Serializable) to ship the function objects (here the MapFunction) to the workers that execute them in parallel. Because of that, the functions need to be serializable: The function may not contain any non-serializable fields, i.e. types that are not primitive (int, long, double, ...) and not implementing java.io.Serializable.

处理不可序列化结构的典型方法是延迟初始化它们.

The typical way to work with non-serializable constructs is to lazily initialize them.

在 Flink 函数中使用不可序列化类型的一种方法是延迟初始化它们.保存这些类型的字段在函数序列化交付时仍然是 null,并且只有在函数被工作人员反序列化后才设置.

One way to use non-serializable types in Flink functions is to lazily initialize them. The fields that hold these types are still null when the function is serialized to be shipped, and only set after the function has been deserialized by the workers.

  • 在 Scala 中,您可以简单地使用惰性字段,例如 lazy val x = new NonSerializableType().NonSerializableType 类型实际上仅在第一次访问变量 x 时创建,该变量通常在 worker 上.因此,该类型可以是不可序列化的,因为当函数被序列化以传送给工作人员时,x 为 null.

  • In Scala, you can simply use lazy fields, for example lazy val x = new NonSerializableType(). The NonSerializableType type is actually only created upon first access to the variable x, which is usually on the worker. Consequently, the type can be non serializable, because x is null when the function is serialized to shipping to the workers.

在 Java 中,您可以在函数的 open() 方法上初始化不可序列化的字段,如果您使它成为一个富函数.丰富的函数(如 RichMapFunction)是基本函数(此处为 MapFunction)的扩展版本,并允许您访问生命周期方法,如 open()close().

In Java, you can initialize the non-serializable fields on the open() method of the function, if you make it a Rich Function. Rich functions (like RichMapFunction) are extended versions of basic functions (here MapFunction) and give you access to life-cycle methods like open() and close().

我不太熟悉依赖注入,但 dagger 似乎也提供了类似惰性依赖的东西,这可能有助于作为一种解决方法,就像 Scala 中的惰性变量一样:

I am not too familiar with dependency injection, but dagger seems to provide something like a lazy dependency as well, which may help as a workaround quite like lazy variables in Scala:

new MapFunction<Long, Long>() {

  @Inject Lazy<MyDependency> dep;

  public Long map(Long value) {
    return dep.get().doSomething(value);
  }
}

这篇关于flink - 使用匕首注入 - 不可序列化?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆