将检查点链接到Google Cloud Storage [英] Flink checkpoints to Google Cloud Storage

查看:48
本文介绍了将检查点链接到Google Cloud Storage的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试为GCS中的flink作业配置检查点. 如果我在本地运行测试作业(没有docker和任何群集设置),一切都可以正常工作,但是如果我使用docker-compose或群集设置运行它并在flink仪表板中部署带有工作的胖子,则一切都会失败并显示错误.

I am trying to configure checkpoints for flink jobs in GCS. Everything works fine if I run a test job locally (no docker and any cluster setup) but it fails with an error if I run it using docker-compose or cluster setup and deploy fat jar with jobs in flink dashboard.

有什么想法吗? 谢谢!

Any thoughts of it? Thanks!

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:61)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:441)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:379)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:247)
... 33 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)

Env配置如下:

StreamExecutionEnvironment env = applicationContext.getBean(StreamExecutionEnvironment.class);
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.setFailOnCheckpointingErrors(false);
    checkpointConfig.setCheckpointInterval(10000);
    checkpointConfig.setMinPauseBetweenCheckpoints(5000);
    checkpointConfig.setMaxConcurrentCheckpoints(1);
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(
            String.format("gs://checkpoints/%s", jobClass.getSimpleName()), true);
    env.setStateBackend((StateBackend) rocksDBStateBackend);

这是我的core-site.xml文件:

<configuration>
<property>
    <name>google.cloud.auth.service.account.enable</name>
    <value>true</value>
</property>
<property>
    <name>google.cloud.auth.service.account.json.keyfile</name>
    <value>${user.dir}/key.json</value>
</property>
<property>
    <name>fs.gs.impl</name>
    <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value>
    <description>The FileSystem for gs: (GCS) uris.</description>
</property>
<property>
    <name>fs.AbstractFileSystem.gs.impl</name>
    <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
    <description>The AbstractFileSystem for gs: (GCS) uris.</description>
</property>
<property>
    <name>fs.gs.application.name.suffix</name>
    <value>-kube-flink</value>
    <description>
        Appended to the user-agent header for API requests to GCS to help identify
        the traffic as coming from Dataproc.
    </description>
</property>

对gcs-connector的依赖性:

Dependency to gcs-connector:

<dependency>
        <groupId>com.google.cloud.bigdataoss</groupId>
        <artifactId>gcs-connector</artifactId>
        <version>1.9.4-hadoop2</version>
</dependency>

更新:

在使用依赖项进行一些操作之后,我已经能够编写检查点.我当前的设置是:

After some manipulation with dependencies I've been able to write checkpoints. My current setup is:

<dependency>
        <groupId>com.google.cloud.bigdataoss</groupId>
        <artifactId>gcs-connector</artifactId>
        <version>hadoop2-1.9.5</version>
</dependency>
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
        <version>1.5.1</version>
</dependency>

我也将flink图像切换为版本flink:1.5.2-hadoop28

Also I switched the flink image to version flink:1.5.2-hadoop28

不幸的是,我仍然无法读取检查点数据,因为我的工作总是在恢复状态时失败,并显示错误:

Unfortunately I am still not able to read checkpointing data as my job is always failing on restoring state with an error:

java.lang.NoClassDefFoundError: com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl$6
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:666)
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:323)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:136)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1102)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1005)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:974)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)

我相信这将是最后一个错误...

I believe it's going to be the last error...

推荐答案

最后,我找到了解决方法

Finally I found solution here

您必须创建自己的映像并将gcs-connector放入lib目录.否则,您总是会遇到类加载问题(用户代码和系统类加载器).

You must create your own image and put gcs-connector into the lib directory. Otherwise you'll always get classloading issues (user code and system classloaders).

要创建自定义Docker映像,我们创建以下Dockerfile:

To create a custom Docker image we create the following Dockerfile:

FROM registry.platform.data-artisans.net/trial/v1.0/flink:1.4.2-dap1-scala_2.11

RUN wget -O lib/gcs-connector-latest-hadoop2.jar https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar

RUN wget -O lib/gcs-connector-latest-hadoop2.jar https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar && \     
wget http://ftp.fau.de/apache/flink/flink-1.4.2/flink-1.4.2-bin-hadoop28-scala_2.11.tgz && \

tar xf flink-1.4.2-bin-hadoop28-scala_2.11.tgz && \
mv flink-1.4.2/lib/flink-shaded-hadoop2* lib/  && \
rm -r flink-1.4.2*  

RUN mkdir etc-hadoop
COPY <name of key file>.json etc-hadoop/
COPY core-site.xml etc-hadoop/

ENTRYPOINT ["/docker-entrypoint.sh"]
EXPOSE 6123 8081
CMD ["jobmanager"]

Docker镜像将基于我们提供的Flink镜像 dA平台试验的一部分.我们正在添加Google Cloud Storage 连接器,Flink的Hadoop软件包以及带有配置的密钥 文件.

The Docker image will be based on the Flink image we’re providing as part of the dA Platform trial. We are adding the Google Cloud Storage connector, Flink’s Hadoop package and the key with the configuration file.

要构建自定义图片,以下文件应位于您的 当前目录:core-site.xml,Dockerfile和密钥文件(.json).

To build the custom image, the following files should be in your current directory: core-site.xml, Dockerfile and the key-file (.json).

要最终触发自定义图像的构建,我们运行以下命令 命令:

To finally trigger the build of the custom image, we run the following command:

$ docker build -t flink-1.4.2-gs .

图片制作完成后,我们会将图片上传到Google的 容器注册表.配置Docker以正确访问 注册表,请运行一次此命令:

Once the image has been built, we will upload the image to Google’s Container Registry. To configure Docker to properly access the registry, run this command once:

$ gcloud auth configure-docker

接下来,我们将标记并上传容器:

Next, we’ll tag and upload the container:

$ docker tag flink-1.4.2-gs:latest eu.gcr.io/<your project id>/flink-1.4.2-gs
$ docker push eu.gcr.io/<your project id>/flink-1.4.2-gs

上传完成后,我们需要为 应用程序管理器部署.发送了以下PATCH请求:

Once the upload is completed, we need to set the custom image for an Application Manager deployment. Sent the following PATCH request:

PATCH /api/v1/deployments/<your AppMgr deployment id>
 spec:
   template:
     spec:
       flinkConfiguration:
         fs.hdfs.hadoopconf: /opt/flink/etc-hadoop/
       artifact:
         flinkImageRegistry: eu.gcr.io
         flinkImageRepository: <your project id>/flink-1.4.2-gs
         flinkImageTag: latest

或者,使用以下curl命令:

Alternatively, use the following curl command:

$ curl -X PATCH --header 'Content-Type: application/yaml' --header 'Accept: application/yaml' -d '  spec: \ 
    template: \ 
      spec: \ 
        flinkConfiguration:
          fs.hdfs.hadoopconf: /opt/flink/etc-hadoop/
        artifact: \ 
          flinkImageRegistry: eu.gcr.io \ 
          flinkImageRepository: <your project id>/flink-1.4.2-gs \ 
          flinkImageTag: latest' 'http://localhost:8080/api/v1/deployments/<your AppMgr deployment id>‘

实施此更改后,您将可以检查点到Google的 云储存.指定目录时使用以下模式 gs:///checkpoints.对于保存点,设置 state.savepoints.dir Flink配置选项.

With this change implemented, you’ll be able to checkpoint to Google’s Cloud Storage. Use the following pattern when specifying the directory gs:///checkpoints. For savepoints, set the state.savepoints.dir Flink configuration option.

这篇关于将检查点链接到Google Cloud Storage的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆