在 Apache Flink 中使用 Collections$UnmodifiableCollection [英] Using an Collections$UnmodifiableCollection with Apache Flink
问题描述
使用 Apache Flink 时的代码如下:
While using Apache Flink with the following code:
DataStream<List<String>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<String, List<String>>() {
@Override
public void mapWindow(Iterable<String> iterable, Collector<List<String>> collector) throws Exception {
List<String> top5 = Ordering.natural().greatestOf(iterable, 5);
collector.collect(top5);
}
}).flatten();
我遇到了这个异常
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211)
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110)
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
at java.lang.Thread.run(Thread.java:745)
如何在 Flink 中使用 UnmodifiableCollection
?
How can I ue the UnmodifiableCollection
with Flink?
推荐答案
问题是 Kryo 默认的 CollectionSerializer
不能再次反序列化集合,因为它是不可修改的(.add()
调用失败).
The problem is that the default CollectionSerializer
of Kryo can not deserialize the collection again, because its not modifiable (the .add()
call fails).
要解决此问题,我们可以使用 kryo-serializers 中的 UnmodifiableCollectionsSerializer
项目.Flink 传递依赖于项目,所以不需要添加它作为依赖.
To resolve the issue, we can use the UnmodifiableCollectionsSerializer
from the kryo-serializers project. Flink transitively depends on the project, so there is no need to add it as a dependency.
接下来,我们必须向 Flink 的 Kryo 实例注册序列化器.
Next, we have to register the serializer with Flink's Kryo instances.
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
see.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
通常,我们不需要调用Class.forName()
来注册序列化器,但在这种情况下,java.util.Collections$UnmodifiableCollection
是包可见,所以我们不能直接访问这个类.
Usually, we don't have to call Class.forName()
for registering a serializer, but in this case, java.util.Collections$UnmodifiableCollection
is package visible, so we can not directly access the class.
这篇关于在 Apache Flink 中使用 Collections$UnmodifiableCollection的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!