Flink- 在远程集群上运行 WordCount 示例时出错 [英] Flink- error on running WordCount example on remote cluster

查看:77
本文介绍了Flink- 在远程集群上运行 WordCount 示例时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 VirtualBox 上有一个 Flink 集群,包括三个节点,1 个主节点和 2 个从节点.我自定义了 WordCount 示例并创建了一个胖 jar 文件以使用 VirtualBox Flink 远程集群运行它,但我遇到了错误.

I have a Flink Cluster on VirtualBox incliding three node, 1 master and 2 slaves. I customized WordCount example and create a fat jar file to run it using VirtualBox Flink remote cluster, But I faced Error.

注意:我手动将依赖项导入到项目中(使用 Intellij IDEA)并且我没有使用 maven 作为依赖项提供者.我在本地机器上测试了我的代码,没问题!

Notice: I imported dependencies manually to the project(using Intellij IDEA) and I didn't use maven as dependency provider. I test my code on local machine and it was OK!

更多详情如下:

这是我的 Java 代码:

Here is my Java code:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;

public class WordCount {

// *************************************************************************
//     PROGRAM
// *************************************************************************

public static void main(String[] args) throws Exception {

    final ParameterTool params = ParameterTool.fromArgs(args);
    final int port;
    final String ip;
    DataSet<String> text;
    try {
        ip = params.get("ip");
        port = params.getInt("port");
        final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(ip, port, 2);
        text = env.readTextFile(params.get("input"));
    } catch (Exception e) {
        System.err.println("No port or input or ip specified. Please run 'SocketWindowWordCount --ip <ip> --port <port>'" +
                " --input <input>");
        return;
    }

    DataSet<Tuple2<String, Integer>> counts =
            // split up the lines in pairs (2-tuples) containing: (word,1)
            text.flatMap(new Tokenizer())
                    // group by the tuple field "0" and sum up tuple field "1"
                    .groupBy(0)
                    .sum(1);

    System.out.println("Printing result to stdout. Use --output to specify output path.");
    counts.print();

}

// *************************************************************************
//     USER FUNCTIONS
// *************************************************************************

/**
 * Implements the string tokenizer that splits sentences into words as a user-defined
 * FlatMapFunction. The function takes a line (String) and splits it into
 * multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
 */
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");

        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<String, Integer>(token, 1));
            }
        }
    }
}

}

我使用命令创建了 ExecutionEnvironment 对象:

I created ExecutionEnvironment object using command:

final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(ip, port, 2);

,我在主机上使用以下命令运行代码(连接到集群节点并且 VirtualBox 正在运行)

, And I run the code using the following command on host machine(that is connected to the cluster nodes and VirtualBox is running on that)

java -cp FlinkWordCountClusetr.jar WordCount --ip 192.168.101.10 --port 6123 --input /usr/local/flink/LICENSE

,但我遇到了以下错误(总结):

, But I faced the following error(in summarized):

Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Could not start the ActorSystem needed to talk to the JobManager.
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.remote.log-received-messages'

我该如何解决?

推荐答案

尝试了很多设置后,都是 maven 依赖与远程集群上安装的 Flink 版本不匹配.Maven 依赖是 Flink 版本 1.3.2 构建在 Scala 2.10 上,而安装在远程集群上的 Flink 是 1.3.2 构建在 Scala >2.11.只是细微的差别但很重要!

After trying many settings, it was all about maven dependencies was not matching Flink version installed on the remote cluster. Maven dependencies were Flink version 1.3.2 build on Scala 2.10, while Flink installed on the remote cluster was 1.3.2 build on Scala 2.11. Just a minor difference but important!

这篇关于Flink- 在远程集群上运行 WordCount 示例时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆