Flink 检查点到 Google Cloud Storage [英] Flink checkpoints to Google Cloud Storage

查看:55
本文介绍了Flink 检查点到 Google Cloud Storage的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试为 GCS 中的 flink 作业配置检查点.如果我在本地运行测试作业(没有 docker 和任何集群设置),一切正常,但如果我使用 docker-compose 或集群设置运行它并在 flink 仪表板中部署带有作业的 fat jar,它会失败并出现错误.

有什么想法吗?谢谢!

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 找不到方案gs"的文件系统实现.Flink 不直接支持该方案,并且无法加载支持该方案的 Hadoop 文件系统.在 org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)在 org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)在 org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)在 org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:61)在 org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:441)在 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:379)在 org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:247)... 33 更多引起:org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:Hadoop 不在类路径/依赖项中.在 org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)在 org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)

Env 配置是这样的:

StreamExecutionEnvironment env = applicationContext.getBean(StreamExecutionEnvironment.class);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setFailOnCheckpointingErrors(false);checkpointConfig.setCheckpointInterval(10000);checkpointConfig.setMinPauseBetweenCheckpoints(5000);checkpointConfig.setMaxConcurrentCheckpoints(1);checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);RocksDBStateBackend RocksDBStateBackend = new RocksDBStateBackend(String.format("gs://checkpoints/%s", jobClass.getSimpleName()), true);env.setStateBackend((StateBackend)rocksDBStateBackend);

这是我的 core-site.xml 文件:

<预><代码><配置><财产><name>google.cloud.auth.service.account.enable</name><值>真</值></属性><财产><name>google.cloud.auth.service.account.json.keyfile</name><value>${user.dir}/key.json</value></属性><财产><name>fs.gs.impl</name><value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value><description>gs 的文件系统:(GCS) uri.</description></属性><财产><name>fs.AbstractFileSystem.gs.impl</name><value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value><description>gs 的 AbstractFileSystem: (GCS) uris.</description></属性><财产><name>fs.gs.application.name.suffix</name><value>-kube-flink</value><说明>附加到用户代理标头,用于向 GCS 发出 API 请求以帮助识别来自 Dataproc 的流量.</描述></属性>

对 gcs-connector 的依赖:

<依赖><groupId>com.google.cloud.bigdataoss</groupId><artifactId>gcs-connector</artifactId><version>1.9.4-hadoop2</version></依赖>

更新:

在对依赖项进行一些操作后,我已经能够编写检查点.我目前的设置是:

<依赖><groupId>com.google.cloud.bigdataoss</groupId><artifactId>gcs-connector</artifactId><version>hadoop2-1.9.5</version></依赖><依赖><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId><version>1.5.1</version></依赖>

我也将 flink 镜像切换到版本 flink:1.5.2-hadoop28

不幸的是,我仍然无法读取检查点数据,因为我的工作总是无法通过错误恢复状态:

java.lang.NoClassDefFoundError: com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl$6在 com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:666)在 com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:323)在 com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:136)在 com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1102)在 org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)在 org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)在 org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)在 org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)在 org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)在 org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1005)在 org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)在 org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:974)在 org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)在 org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)在 org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443)在 org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149)在 org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)在 org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)在 org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)在 org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)在 org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)在 java.lang.Thread.run(Thread.java:748)

我相信这将是最后一个错误...

解决方案

我终于找到了解决方案 这里

您必须创建自己的映像并将 gcs-connector 放入 lib 目录.否则,您将始终遇到类加载问题(用户代码和系统类加载器).

要创建自定义 Docker 映像,我们创建以下 Dockerfile:

<块引用>

FROM registry.platform.data-artisans.net/trial/v1.0/flink:1.4.2-dap1-scala_2.11运行 wget -O lib/gcs-connector-latest-hadoop2.jar https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar运行 wget -O lib/gcs-connector-latest-hadoop2.jar https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar &&\wget http://ftp.fau.de/apache/flink/flink-1.4.2/flink-1.4.2-bin-hadoop28-scala_2.11.tgz &&\tar xf flink-1.4.2-bin-hadoop28-scala_2.11.tgz &&\mv flink-1.4.2/lib/flink-shaded-hadoop2* lib/&&\rm -r flink-1.4.2*运行 mkdir etc-hadoopCOPY <密钥文件名称>.json etc-hadoop/复制 core-site.xml etc-hadoop/入口点 ["/docker-entrypoint.sh"]曝光 6123 8081CMD [作业经理"]

Docker 镜像将基于我们提供的 Flink 镜像dA 平台试验的一部分.我们正在添加 Google Cloud Storage连接器、Flink 的 Hadoop 包和配置密钥文件.

要构建自定义映像,以下文件应位于您的当前目录:core-site.xml、Dockerfile 和密钥文件 (.json).

为了最终触发自定义镜像的构建,我们运行以下命令命令:

$ docker build -t flink-1.4.2-gs .

图像构建完成后,我们会将图像上传到 Google 的容器注册表.配置 Docker 以正确访问注册表,运行此命令一次:

$ gcloud auth configure-docker

接下来,我们将标记并上传容器:

$ docker tag flink-1.4.2-gs:latest eu.gcr.io/<你的项目id>/flink-1.4.2-gs$ docker push eu.gcr.io/<你的项目ID>/flink-1.4.2-gs

上传完成后,我们需要设置自定义图片应用程序管理器部署.发送了以下 PATCH 请求:

PATCH/api/v1/deployments/<您的 AppMgr 部署 ID>规格:模板:规格:flink配置:fs.hdfs.hadoopconf:/opt/flink/etc-hadoop/神器:flinkImageRegistry:eu.gcr.ioflinkImageRepository: <你的项目id>/flink-1.4.2-gsflinkImageTag:最新

或者,使用以下 curl 命令:

$ curl -X PATCH --header 'Content-Type: application/yaml' --header 'Accept: application/yaml' -d ' spec: \模板: \规格:\flink配置:fs.hdfs.hadoopconf:/opt/flink/etc-hadoop/神器:\flinkImageRegistry: eu.gcr.io \flinkImageRepository: <你的项目id>/flink-1.4.2-gs \flinkImageTag:最新的''http://localhost:8080/api/v1/deployments/<你的AppMgr部署ID>'

实施此更改后,您将能够检查点到 Google 的云储存.指定目录时使用以下模式gs:///检查点.对于保存点,设置state.savepoints.dir Flink 配置选项.

I am trying to configure checkpoints for flink jobs in GCS. Everything works fine if I run a test job locally (no docker and any cluster setup) but it fails with an error if I run it using docker-compose or cluster setup and deploy fat jar with jobs in flink dashboard.

Any thoughts of it? Thanks!

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:61)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:441)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:379)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:247)
... 33 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)

Env configuration is like this:

StreamExecutionEnvironment env = applicationContext.getBean(StreamExecutionEnvironment.class);
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.setFailOnCheckpointingErrors(false);
    checkpointConfig.setCheckpointInterval(10000);
    checkpointConfig.setMinPauseBetweenCheckpoints(5000);
    checkpointConfig.setMaxConcurrentCheckpoints(1);
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(
            String.format("gs://checkpoints/%s", jobClass.getSimpleName()), true);
    env.setStateBackend((StateBackend) rocksDBStateBackend);

Here is my core-site.xml file:

<configuration>
<property>
    <name>google.cloud.auth.service.account.enable</name>
    <value>true</value>
</property>
<property>
    <name>google.cloud.auth.service.account.json.keyfile</name>
    <value>${user.dir}/key.json</value>
</property>
<property>
    <name>fs.gs.impl</name>
    <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value>
    <description>The FileSystem for gs: (GCS) uris.</description>
</property>
<property>
    <name>fs.AbstractFileSystem.gs.impl</name>
    <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
    <description>The AbstractFileSystem for gs: (GCS) uris.</description>
</property>
<property>
    <name>fs.gs.application.name.suffix</name>
    <value>-kube-flink</value>
    <description>
        Appended to the user-agent header for API requests to GCS to help identify
        the traffic as coming from Dataproc.
    </description>
</property>

Dependency to gcs-connector:

<dependency>
        <groupId>com.google.cloud.bigdataoss</groupId>
        <artifactId>gcs-connector</artifactId>
        <version>1.9.4-hadoop2</version>
</dependency>

UPDATE:

After some manipulation with dependencies I've been able to write checkpoints. My current setup is:

<dependency>
        <groupId>com.google.cloud.bigdataoss</groupId>
        <artifactId>gcs-connector</artifactId>
        <version>hadoop2-1.9.5</version>
</dependency>
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
        <version>1.5.1</version>
</dependency>

Also I switched the flink image to version flink:1.5.2-hadoop28

Unfortunately I am still not able to read checkpointing data as my job is always failing on restoring state with an error:

java.lang.NoClassDefFoundError: com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl$6
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:666)
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:323)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:136)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1102)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1005)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:974)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)

I believe it's going to be the last error...

解决方案

Finally I found solution here

You must create your own image and put gcs-connector into the lib directory. Otherwise you'll always get classloading issues (user code and system classloaders).

To create a custom Docker image we create the following Dockerfile:

FROM registry.platform.data-artisans.net/trial/v1.0/flink:1.4.2-dap1-scala_2.11

RUN wget -O lib/gcs-connector-latest-hadoop2.jar https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar

RUN wget -O lib/gcs-connector-latest-hadoop2.jar https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar && \     
wget http://ftp.fau.de/apache/flink/flink-1.4.2/flink-1.4.2-bin-hadoop28-scala_2.11.tgz && \

tar xf flink-1.4.2-bin-hadoop28-scala_2.11.tgz && \
mv flink-1.4.2/lib/flink-shaded-hadoop2* lib/  && \
rm -r flink-1.4.2*  

RUN mkdir etc-hadoop
COPY <name of key file>.json etc-hadoop/
COPY core-site.xml etc-hadoop/

ENTRYPOINT ["/docker-entrypoint.sh"]
EXPOSE 6123 8081
CMD ["jobmanager"]

The Docker image will be based on the Flink image we’re providing as part of the dA Platform trial. We are adding the Google Cloud Storage connector, Flink’s Hadoop package and the key with the configuration file.

To build the custom image, the following files should be in your current directory: core-site.xml, Dockerfile and the key-file (.json).

To finally trigger the build of the custom image, we run the following command:

$ docker build -t flink-1.4.2-gs .

Once the image has been built, we will upload the image to Google’s Container Registry. To configure Docker to properly access the registry, run this command once:

$ gcloud auth configure-docker

Next, we’ll tag and upload the container:

$ docker tag flink-1.4.2-gs:latest eu.gcr.io/<your project id>/flink-1.4.2-gs
$ docker push eu.gcr.io/<your project id>/flink-1.4.2-gs

Once the upload is completed, we need to set the custom image for an Application Manager deployment. Sent the following PATCH request:

PATCH /api/v1/deployments/<your AppMgr deployment id>
 spec:
   template:
     spec:
       flinkConfiguration:
         fs.hdfs.hadoopconf: /opt/flink/etc-hadoop/
       artifact:
         flinkImageRegistry: eu.gcr.io
         flinkImageRepository: <your project id>/flink-1.4.2-gs
         flinkImageTag: latest

Alternatively, use the following curl command:

$ curl -X PATCH --header 'Content-Type: application/yaml' --header 'Accept: application/yaml' -d '  spec: \ 
    template: \ 
      spec: \ 
        flinkConfiguration:
          fs.hdfs.hadoopconf: /opt/flink/etc-hadoop/
        artifact: \ 
          flinkImageRegistry: eu.gcr.io \ 
          flinkImageRepository: <your project id>/flink-1.4.2-gs \ 
          flinkImageTag: latest' 'http://localhost:8080/api/v1/deployments/<your AppMgr deployment id>‘

With this change implemented, you’ll be able to checkpoint to Google’s Cloud Storage. Use the following pattern when specifying the directory gs:///checkpoints. For savepoints, set the state.savepoints.dir Flink configuration option.

这篇关于Flink 检查点到 Google Cloud Storage的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆