flink - 使用匕首注射 - 不可序列化? [英] flink - using dagger injections - not serializable?

查看:451
本文介绍了flink - 使用匕首注射 - 不可序列化?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用Flink(最新的git)从kafka流到cassandra。为了简化单元测试我通过Dagger添加依赖注入。

Im using Flink (latest via git) to stream from kafka to cassandra. To ease unit testing Im adding dependency injection via Dagger.

ObjectGraph似乎正在正确设置自己,但是'内部对象'被Flink标记为不可序列化。如果我直接包含这些对象,那么它们有用吗?那么有什么区别?

The ObjectGraph seems to be setting itself up properly but the 'inner objects' are being flagged as 'not serializable' by Flink. If I include these objects directly they work - so what's the difference?

有问题的类实现 MapFunction @Inject 一个用于cassandra的模块和一个用于读取配置文件的模块。

Class in question implements MapFunction and @Inject a module for cassandra and one for reading config files.

有没有办法构建这个,所以我可以使用后期绑定或Flink使这个不可能?

Is there a way to build this so I can use late binding or does Flink make this impossible?

fwiw - 依赖注入(通过匕首)和 RichMapFunction 不能共存。 Dagger不允许您在其定义中包含扩展的任何对象。

fwiw - Dependency injection (via dagger) and RichMapFunction can't coexist. Dagger won't let you include any objects that have extends in their definition.

通过Dagger Lazy< t>实例化的对象也不会序列化。

Objects instantiated via Dagger Lazy<t> won't serialize either.


线程mainorg.apache中的异常.flink.api.common.InvalidProgramException:对象com.someapp.SaveMap@2e029d61不可序列化

...

引起:java.io.NotSerializableException:dagger.internal.LazyBinding $ 1

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.someapp.SaveMap@2e029d61 not serializable
...
Caused by: java.io.NotSerializableException: dagger.internal.LazyBinding$1


推荐答案

在深入研究问题的具体细节之前,先介绍一下可串行化的背景知识。 Apache Flink中的函数:

Before diving into the specifics of the question, a bit of background on serializability of functions in Apache Flink:

Apache Flink使用Java序列化(java.io.Serializable)发布函数对象(这里是 MapFunction )给并行执行它们的worker。因此,函数需要是可序列化的:函数可能不包含任何非可序列化的字段,即非原始类型(int,long,double,...)并且不实现 java。 io.Serializable

Apache Flink uses Java Serialization (java.io.Serializable) to ship the function objects (here the MapFunction) to the workers that execute them in parallel. Because of that, the functions need to be serializable: The function may not contain any non-serializable fields, i.e. types that are not primitive (int, long, double, ...) and not implementing java.io.Serializable.

使用非可序列化构造的典型方法是懒惰地初始化它们。

The typical way to work with non-serializable constructs is to lazily initialize them.

在Flink函数中使用非可序列化类型的一种方法是懒惰地初始化它们。当序列化函数时,保存这些类型的字段仍然是 null ,并且仅在工作人员对函数进行反序列化后设置。

One way to use non-serializable types in Flink functions is to lazily initialize them. The fields that hold these types are still null when the function is serialized to be shipped, and only set after the function has been deserialized by the workers.


  • 在Scala中,您可以简单地使用惰性字段,例如 lazy val x = new NonSerializableType() NonSerializableType 类型实际上只在首次访问变量 x 时创建,该变量通常在worker上。因此,类型可以是不可序列化的,因为当函数被序列化为运送给工人时, x 为null。

  • In Scala, you can simply use lazy fields, for example lazy val x = new NonSerializableType(). The NonSerializableType type is actually only created upon first access to the variable x, which is usually on the worker. Consequently, the type can be non serializable, because x is null when the function is serialized to shipping to the workers.

在Java中,如果使其成为 Rich Function <,则可以在函数的 open()方法中初始化非可序列化字段。 / em>的。丰富的功能(如 RichMapFunction )是基本功能的扩展版本(此处为 MapFunction ),可让您访问生命周期方法如 open() close()

In Java, you can initialize the non-serializable fields on the open() method of the function, if you make it a Rich Function. Rich functions (like RichMapFunction) are extended versions of basic functions (here MapFunction) and give you access to life-cycle methods like open() and close().

我不太熟悉依赖注入,但是匕首似乎提供了类似懒惰依赖的东西同样,这可能有助于解决方法与Scala中的惰性变量非常相似:

I am not too familiar with dependency injection, but dagger seems to provide something like a lazy dependency as well, which may help as a workaround quite like lazy variables in Scala:

new MapFunction<Long, Long>() {

  @Inject Lazy<MyDependency> dep;

  public Long map(Long value) {
    return dep.get().doSomething(value);
  }
}

这篇关于flink - 使用匕首注射 - 不可序列化?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆