Apache Flink:在远程集群上执行扩展 RichFlatMapFunction 的程序导致错误 [英] Apache Flink: executing a program which extends the RichFlatMapFunction on the remote cluster causes error
问题描述
我在 Apache Flink 中有以下代码.它在本地集群中运行良好,而在远程集群上运行时会在包含命令stack.push(recordPair);"的行中生成 NullPointerException 错误.
I have the following code in Apache Flink. It works fine in the local cluster while running it on the remote cluster generates NullPointerException error in line containing the command "stack.push(recordPair);".
有人知道是什么原因吗?
Does any one know, what is the reason?
本地和远程集群的输入数据集相同.
Input dataset is the same for both local and remote cluster.
public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair ;
private static HashMap< Integer, Set<Integer>> clusters_duplicate_map ;
private static Stack<Tuple2< Integer,Integer>> stack ;
public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
...
stack = new Stack<Tuple2< Integer,Integer>>();
}
@Override
public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
if (recordPair!= null)
{
stack.push(recordPair);
...
}
}
推荐答案
问题是你在 TC
类的构造函数中初始化了 stack
变量.这仅为运行客户端程序的 JVM 初始化静态变量.对于本地执行,这是可行的,因为 Flink 作业是在同一个 JVM 中执行的.
The problem is that you initialize the stack
variable in the constructor of the TC
class. This initializes the static variable only for the JVM in which the client program runs. For the local execution this works because the Flink job is executed in the same JVM.
当您在集群上运行它时,您的 TC
将被序列化并发送到集群节点.在那里,实例的反序列化不会再次调用构造函数来初始化 stack
.为了使这项工作,您应该将初始化逻辑移动到 RichFlatMapFunction
的 open
方法或使用静态初始化器.但请注意,所有运行在同一个 TaskManager
上的操作符将共享相同的 stack
实例,因为它是一个类变量.
When you run it on the cluster, your TC
will be serialized and shipped to the cluster nodes. There the deserialization of the instance does not call again the constructor to initialize stack
. In order to make this work, you should move the initialization logic to the open
method of the RichFlatMapFunction
or use a static intializer. But be aware that all operators which run on the same TaskManager
will share the same instance of stack
, because it is a class variable.
public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair;
private static HashMap< Integer, Set<Integer>> clusters_duplicate_map;
// either use a static initializer
private static Stack<Tuple2< Integer,Integer>> stack = new Stack<Tuple2< Integer,Integer>>();
public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
...
}
@Override
public void open(Configuration config) {
// or initialize stack here, but here you have to synchronize the initialization
...
}
@Override
public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
if (recordPair!= null)
{
stack.push(recordPair);
...
}
}
}
这篇关于Apache Flink:在远程集群上执行扩展 RichFlatMapFunction 的程序导致错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!