在 Apache Flink 中使用 Collections$UnmodifiableCollection [英] Using an Collections$UnmodifiableCollection with Apache Flink

查看:49
本文介绍了在 Apache Flink 中使用 Collections$UnmodifiableCollection的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 Apache Flink 时的代码如下:

While using Apache Flink with the following code:

DataStream<List<String>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<String, List<String>>() {

    @Override
    public void mapWindow(Iterable<String> iterable, Collector<List<String>> collector) throws Exception {
        List<String> top5 = Ordering.natural().greatestOf(iterable, 5);
        collector.collect(top5);
    }
}).flatten();

我遇到了这个异常

Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)

如何在 Flink 中使用 UnmodifiableCollection?

How can I ue the UnmodifiableCollection with Flink?

推荐答案

问题是 Kryo 默认的 CollectionSerializer 不能再次反序列化集合,因为它是不可修改的(.add() 调用失败).

The problem is that the default CollectionSerializer of Kryo can not deserialize the collection again, because its not modifiable (the .add() call fails).

要解决此问题,我们可以使用 kryo-serializers 中的 UnmodifiableCollectionsSerializer 项目.Flink 传递依赖于项目,所以不需要添加它作为依赖.

To resolve the issue, we can use the UnmodifiableCollectionsSerializer from the kryo-serializers project. Flink transitively depends on the project, so there is no need to add it as a dependency.

接下来,我们必须向 Flink 的 Kryo 实例注册序列化器.

Next, we have to register the serializer with Flink's Kryo instances.

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
see.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);

通常,我们不需要调用Class.forName()来注册序列化器,但在这种情况下,java.util.Collections$UnmodifiableCollection是包可见,所以我们不能直接访问这个类.

Usually, we don't have to call Class.forName() for registering a serializer, but in this case, java.util.Collections$UnmodifiableCollection is package visible, so we can not directly access the class.

这篇关于在 Apache Flink 中使用 Collections$UnmodifiableCollection的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆