Apache Flink:在远程集群上执行扩展 RichFlatMapFunction 的程序导致错误 [英] Apache Flink: executing a program which extends the RichFlatMapFunction on the remote cluster causes error

查看:65
本文介绍了Apache Flink:在远程集群上执行扩展 RichFlatMapFunction 的程序导致错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Apache Flink 中有以下代码.它在本地集群中运行良好,而在远程集群上运行时会在包含命令stack.push(recordPair);"的行中生成 NullPointerException 错误.

I have the following code in Apache Flink. It works fine in the local cluster while running it on the remote cluster generates NullPointerException error in line containing the command "stack.push(recordPair);".

有人知道是什么原因吗?

Does any one know, what is the reason?

本地和远程集群的输入数据集相同.

Input dataset is the same for both local and remote cluster.

public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
            private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair  ;
            private  static HashMap< Integer, Set<Integer>> clusters_duplicate_map ;
            private  static  Stack<Tuple2< Integer,Integer>> stack ;
            public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
        ...
                stack = new Stack<Tuple2< Integer,Integer>>();
            }
            @Override
            public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
    if (recordPair!= null)
    {
                stack.push(recordPair);
    ...
    }
    }

推荐答案

问题是你在 TC 类的构造函数中初始化了 stack 变量.这仅为运行客户端程序的 JVM 初始化静态变量.对于本地执行,这是可行的,因为 Flink 作业是在同一个 JVM 中执行的.

The problem is that you initialize the stack variable in the constructor of the TC class. This initializes the static variable only for the JVM in which the client program runs. For the local execution this works because the Flink job is executed in the same JVM.

当您在集群上运行它时,您的 TC 将被序列化并发送到集群节点.在那里,实例的反序列化不会再次调用构造函数来初始化 stack.为了使这项工作,您应该将初始化逻辑移动到 RichFlatMapFunctionopen 方法或使用静态初始化器.但请注意,所有运行在同一个 TaskManager 上的操作符将共享相同的 stack 实例,因为它是一个类变量.

When you run it on the cluster, your TC will be serialized and shipped to the cluster nodes. There the deserialization of the instance does not call again the constructor to initialize stack. In order to make this work, you should move the initialization logic to the open method of the RichFlatMapFunction or use a static intializer. But be aware that all operators which run on the same TaskManager will share the same instance of stack, because it is a class variable.

public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
    private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair;
    private  static HashMap< Integer, Set<Integer>> clusters_duplicate_map;
    // either use a static initializer
    private  static  Stack<Tuple2< Integer,Integer>> stack = new Stack<Tuple2< Integer,Integer>>();
    public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
        ...
    }

    @Override
    public void open(Configuration config) {
        // or initialize stack here, but here you have to synchronize the initialization
        ...
    }

    @Override
    public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
        if (recordPair!= null)
        {
                    stack.push(recordPair);
        ...
        }
    }
}

这篇关于Apache Flink:在远程集群上执行扩展 RichFlatMapFunction 的程序导致错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆