EMR 上 S3 的外部检查点 [英] External checkpoints to S3 on EMR

查看:28
本文介绍了EMR 上 S3 的外部检查点的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试为我的 Flink 程序部署一个生产集群.我正在使用安装了 Flink 1.3.2 的标准 hadoop-core EMR 集群,并使用 YARN 来运行它.

I am trying to deploy a production cluster for my Flink program. I am using a standard hadoop-core EMR cluster with Flink 1.3.2 installed, using YARN to run it.

我正在尝试配置我的 RocksDB 以将我的检查点写入 S3 存储桶.我正在尝试阅读这些文档:https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#set-s3-filesystem.问题似乎是让依赖项正常工作.尝试运行程序时收到此错误:

I am trying to configure my RocksDB to write my checkpoints to an S3 bucket. I am trying to go through these docs: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#set-s3-filesystem. The problem seems to be getting the dependencies working correctly. I receive this error when trying run the program:

java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:93)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:328)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:350)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
    at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:282)
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273    

我尝试过保留和调整 core-site.xml 并保持原样.我尝试将 HADOOP_CLASSPATH 设置为 /usr/lib/hadoop/share ,其中包含(我认为是)上述指南中描述的大部分 JAR.我尝试下载 hadoop 2.7.2 二进制文件,并将它们复制到 flink/libs 目录中.所有导致相同的错误.

I have tried both leaving and adjusting the core-site.xml and leaving it as is. I have tried setting the HADOOP_CLASSPATH to the /usr/lib/hadoop/share that contains(what I assume are) most of the JARs described in the above guide. I tried downloading the hadoop 2.7.2 binaries, and copying over them into the flink/libs directory. All resulting in the same error.

有没有人成功地让 Flink 能够在 EMR 上写入 S3?

Has anyone successfully gotten Flink being able to write to S3 on EMR?

我的集群设置

AWS 门户:

1) EMR -> Create Cluster
2) Advanced Options
3) Release = emr-5.8.0
4) Only select Hadoop 2.7.3
5) Next -> Next -> Next -> Create Cluster ( I do fill out names/keys/etc)

集群启动后,我通过 ssh 连接到 Master 并执行以下操作:

Once the cluster is up I ssh into the Master and do the following:

1  wget http://apache.claz.org/flink/flink-1.3.2/flink-1.3.2-bin-hadoop27-scala_2.11.tgz
2  tar -xzf flink-1.3.2-bin-hadoop27-scala_2.11.tgz
3  cd flink-1.3.2
4  ./bin/yarn-session.sh -n 2 -tm 5120 -s 4 -d
5  Change conf/flink-conf.yaml 
6  ./bin/flink run -m yarn-cluster -yn 1 ~/flink-consumer.jar

我的 conf/flink-conf.yaml 我添加以下字段:

My conf/flink-conf.yaml I add the following fields:

state.backend: rocksdb
state.backend.fs.checkpointdir: s3:/bucket/location
state.checkpoints.dir: s3:/bucket/location

我的程序的检查点设置:

My program's checkpointing setup:

env.enableCheckpointing(getCheckpointRate,CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(getCheckpointMinPause)
env.getCheckpointConfig.setCheckpointTimeout(getCheckpointTimeout)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setStateBackend(new RocksDBStateBackend("s3://bucket/location", true))

如果您认为我遗漏了任何步骤,请告诉我

If there are any steps you think I am missing, please let me know

推荐答案

我假设您在 EMR Yarn 集群上自行安装了 Flink 1.3.2,因为 Amazon 尚未提供 Flink <代码>1.3.2,对吧?

I assume that you installed Flink 1.3.2 on your own on the EMR Yarn cluster, because Amazon does not yet offer Flink 1.3.2, right?

鉴于此,您似乎存在依赖性冲突.方法 org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration) 仅在 Hadoop 2.4.0 中引入.因此,我假设您已经部署了使用 Hadoop 2.3.0 构建的 Flink 1.3.2 版本.请部署一个 Flink 版本,该版本是使用在 EMR 上运行的 Hadoop 版本构建的.这很可能会解决所有依赖冲突.

Given that, it seems as if you have a dependency conflict. The method org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration) was only introduced with Hadoop 2.4.0. Therefore I assume that you have deployed a Flink 1.3.2 version which was built with Hadoop 2.3.0. Please deploy a Flink version which was built with the Hadoop version running on EMR. This will most likely solve all dependency conflicts.

将 Hadoop 依赖项放入 lib 文件夹似乎并不可靠,因为 flink-shaded-hadoop2-uber.jar 似乎在类路径中具有优先权.

Putting the Hadoop dependencies into the lib folder seems to not reliably work because the flink-shaded-hadoop2-uber.jar appears to have precedence in the classpath.

这篇关于EMR 上 S3 的外部检查点的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆