Flink-在远程群集上运行WordCount示例时出错 [英] Flink- error on running WordCount example on remote cluster

查看:467
本文介绍了Flink-在远程群集上运行WordCount示例时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在VirtualBox上有一个Flink群集,其中包含三个节点,1个主节点和2个从属节点.我自定义了WordCount示例,并创建了一个胖jar文件来使用VirtualBox Flink远程群集运行它,但是我遇到了错误.

I have a Flink Cluster on VirtualBox incliding three node, 1 master and 2 slaves. I customized WordCount example and create a fat jar file to run it using VirtualBox Flink remote cluster, But I faced Error.

通知:我手动将依赖项导入到项目中(使用Intellij IDEA),并且没有使用maven作为依赖项提供程序.我在本地计算机上测试了代码,没关系!

Notice: I imported dependencies manually to the project(using Intellij IDEA) and I didn't use maven as dependency provider. I test my code on local machine and it was OK!

更多详细信息如下:

这是我的Java代码:

Here is my Java code:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;

public class WordCount {

// *************************************************************************
//     PROGRAM
// *************************************************************************

public static void main(String[] args) throws Exception {

    final ParameterTool params = ParameterTool.fromArgs(args);
    final int port;
    final String ip;
    DataSet<String> text;
    try {
        ip = params.get("ip");
        port = params.getInt("port");
        final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(ip, port, 2);
        text = env.readTextFile(params.get("input"));
    } catch (Exception e) {
        System.err.println("No port or input or ip specified. Please run 'SocketWindowWordCount --ip <ip> --port <port>'" +
                " --input <input>");
        return;
    }

    DataSet<Tuple2<String, Integer>> counts =
            // split up the lines in pairs (2-tuples) containing: (word,1)
            text.flatMap(new Tokenizer())
                    // group by the tuple field "0" and sum up tuple field "1"
                    .groupBy(0)
                    .sum(1);

    System.out.println("Printing result to stdout. Use --output to specify output path.");
    counts.print();

}

// *************************************************************************
//     USER FUNCTIONS
// *************************************************************************

/**
 * Implements the string tokenizer that splits sentences into words as a user-defined
 * FlatMapFunction. The function takes a line (String) and splits it into
 * multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
 */
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");

        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<String, Integer>(token, 1));
            }
        }
    }
}

}

我使用命令创建了ExecutionEnvironment对象:

I created ExecutionEnvironment object using command:

final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(ip, port, 2);

,然后在主机(连接到群集节点并且VirtualBox在其上运行)上使用以下命令运行代码

, And I run the code using the following command on host machine(that is connected to the cluster nodes and VirtualBox is running on that)

java -cp FlinkWordCountClusetr.jar WordCount --ip 192.168.101.10 --port 6123 --input /usr/local/flink/LICENSE

,但是我遇到了以下错误(摘要):

, But I faced the following error(in summarized):

Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Could not start the ActorSystem needed to talk to the JobManager.
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.remote.log-received-messages'

我该如何解决?

推荐答案

尝试了许多设置之后,所有关于maven依赖关系与远程群集上安装的Flink版本都不匹配的问题. Maven依赖项是在Scala 2.10上构建的Flink版本1.3.2,而在远程集群上安装的Flink是在Scala 2.11上构建的1.3.2.只是很小的差异,但很重要!

After trying many settings, it was all about maven dependencies was not matching Flink version installed on the remote cluster. Maven dependencies were Flink version 1.3.2 build on Scala 2.10, while Flink installed on the remote cluster was 1.3.2 build on Scala 2.11. Just a minor difference but important!

这篇关于Flink-在远程群集上运行WordCount示例时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆