EMR上S3的外部检查点 [英] External checkpoints to S3 on EMR

查看:196
本文介绍了EMR上S3的外部检查点的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试为我的Flink程序部署生产集群.我正在使用安装了Flink 1.3.2的标准hadoop-core EMR群集,并使用YARN来运行它.

我正在尝试将RocksDB配置为将检查点写入S3存储桶.我正在尝试浏览以下文档:

如果您认为有任何步骤想念我,请告诉我

我假设您是在EMR Yarn群集上自行安装Flink 1.3.2的,因为Amazon尚未提供Flink 1.3.2,对吗?

鉴于此,似乎您有依赖性冲突.方法org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration)仅在Hadoop 2.4.0中引入.因此,我假设您已经部署了使用Hadoop 2.3.0构建的Flink 1.3.2版本.请部署Flink版本,该版本是使用EMR上运行的Hadoop版本构建的.这很可能会解决所有依赖项冲突.

将Hadoop依赖项放入lib文件夹似乎无法可靠地工作,因为flink-shaded-hadoop2-uber.jar在类路径中似乎具有优先级.

I am trying to deploy a production cluster for my Flink program. I am using a standard hadoop-core EMR cluster with Flink 1.3.2 installed, using YARN to run it.

I am trying to configure my RocksDB to write my checkpoints to an S3 bucket. I am trying to go through these docs: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#set-s3-filesystem. The problem seems to be getting the dependencies working correctly. I receive this error when trying run the program:

java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:93)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:328)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:350)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
    at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:282)
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273    

I have tried both leaving and adjusting the core-site.xml and leaving it as is. I have tried setting the HADOOP_CLASSPATH to the /usr/lib/hadoop/share that contains(what I assume are) most of the JARs described in the above guide. I tried downloading the hadoop 2.7.2 binaries, and copying over them into the flink/libs directory. All resulting in the same error.

Has anyone successfully gotten Flink being able to write to S3 on EMR?

EDIT: My cluster setup

AWS Portal:

1) EMR -> Create Cluster
2) Advanced Options
3) Release = emr-5.8.0
4) Only select Hadoop 2.7.3
5) Next -> Next -> Next -> Create Cluster ( I do fill out names/keys/etc)

Once the cluster is up I ssh into the Master and do the following:

1  wget http://apache.claz.org/flink/flink-1.3.2/flink-1.3.2-bin-hadoop27-scala_2.11.tgz
2  tar -xzf flink-1.3.2-bin-hadoop27-scala_2.11.tgz
3  cd flink-1.3.2
4  ./bin/yarn-session.sh -n 2 -tm 5120 -s 4 -d
5  Change conf/flink-conf.yaml 
6  ./bin/flink run -m yarn-cluster -yn 1 ~/flink-consumer.jar

My conf/flink-conf.yaml I add the following fields:

state.backend: rocksdb
state.backend.fs.checkpointdir: s3:/bucket/location
state.checkpoints.dir: s3:/bucket/location

My program's checkpointing setup:

env.enableCheckpointing(getCheckpointRate,CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(getCheckpointMinPause)
env.getCheckpointConfig.setCheckpointTimeout(getCheckpointTimeout)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setStateBackend(new RocksDBStateBackend("s3://bucket/location", true))

If there are any steps you think I am missing, please let me know

解决方案

I assume that you installed Flink 1.3.2 on your own on the EMR Yarn cluster, because Amazon does not yet offer Flink 1.3.2, right?

Given that, it seems as if you have a dependency conflict. The method org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration) was only introduced with Hadoop 2.4.0. Therefore I assume that you have deployed a Flink 1.3.2 version which was built with Hadoop 2.3.0. Please deploy a Flink version which was built with the Hadoop version running on EMR. This will most likely solve all dependency conflicts.

Putting the Hadoop dependencies into the lib folder seems to not reliably work because the flink-shaded-hadoop2-uber.jar appears to have precedence in the classpath.

这篇关于EMR上S3的外部检查点的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆