是否可以将 Riak CS 与 Apache Flink 一起使用? [英] Is it possible to use Riak CS with Apache Flink?

查看:26
本文介绍了是否可以将 Riak CS 与 Apache Flink 一起使用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想配置filesystem状态后端和zookeeper恢复模式:

state.backend:文件系统state.backend.fs.checkpointdir: ???恢复模式:动物园管理员recovery.zookeeper.storageDir: ???

如您所见,我应该指定 checkpointdirstorageDir 参数,但我没有 Apache Flink 支持的任何文件系统(如 HDFS 或 Amazon S3).但是我已经安装了 Riak CS 集群(看起来它与 S3 兼容).

那么,我可以将 Riak CS 与 Apache Flink 一起使用吗?如果可能:如何配置 Apache Flink 以与 Riak CS 一起工作?

解决方案

答案:如何加入 Apache Flink 和 Riak CS?

Riak CS 具有 S3(版本 2)兼容接口.因此,可以使用来自 Hadoop 的 S3 文件系统适配器与 Riak CS 一起工作.

我不知道为什么,但 Apache Flink 在 fat jar (lib/flink-dist_2.11-1.0.1.jar) 中只有部分 Hadoop 文件系统适配器,即它有 FTP 文件系统(org.apache.hadoop.fs.ftp.FTPFileSystem) 但没有 S3 文件系统(即 org.apache.hadoop.fs.s3a.S3AFileSystem).因此,您有两种方法可以解决此问题:

  • 使用 Hadoop 安装中的这些适配器.我没有尝试过,但似乎您应该只配置 HADOOP_CLASSPATH 或 HADOOP_HOME evn 变量.
  • monky 修补 Apache Flink 并将所需的 JAR 下载到 /lib 目录

因此,我选择第二种方式,因为不想在我的环境中配置 Hadoop.您可以从 Hadoop dist 或互联网复制 JAR:

curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o/flink/lib/hadoop-aws-2.7.2.jarcurl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o/flink/lib/aws-java-sdk-1.7.4.jarcurl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o/flink/lib/httpcore-4.2.5.jarcurl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o/flink/lib/httpclient-4.2.5.jar

如您所见,我使用的是旧版本,因为该版本在 Hadoop 2.7.2 中使用,并且我使用与此版本 Hadoop 兼容的 Flink.

仅供参考:如果您在自己的流程中使用这些 JAR 的最新版本,则此类 hack 可能会导致问题.为避免与不同版本相关的问题,您可以在使用 Flow 构建胖 jar 时重新定位包,使用类似(我正在使用 Gradle):

//重新定位 org.apache.http 包,因为 Apache Flink 包含该库的旧版本(我们放置它们以使用 S3 兼容的 FS)shadowJar {依赖{包括(依赖('.*:.*:.*'))}重新定位'org.apache.http','relocated.org.apache.http'重新定位'org.apache.commons','relocated.org.apache.commons'}

然后你应该在flink-conf.yaml中指定core-site.xml的路径,因为Hadoop兼容的文件系统使用这个配置来加载设置:

<预><代码>...fs.hdfs.hadoopconf:/flink/conf...

如你所见,我只是将它放在 /conf 目录中.它有以下设置:

然后你应该在 flink-conf.yaml 中配置 Riak CS 桶作为推荐者 此处:

<预><代码>...state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints...recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery...

并在 Riak CS 中创建存储桶.我正在使用 s3cmd(在我的 OS X 开发环境中通过 brew 安装):

s3cmd mb s3://example-staging-flink

仅供参考:在使用 s3cmd 之前,您应该使用 s3cmd --configure 对其进行配置,然后在 ~/.s3cmd 文件中修复一些设置:

signature_v2 = True//因为 Riak CS 使用 S3 V2 接口use_https = False//如果你不使用 SSLaccess_key = ???秘钥 = ???host_base = my-riak-cs.stage.local//你的 Riak CS 主机host_bucket = %(bucket).my-riak-cs.stage.local//Riak CS使用的bucket格式

因此,这就是您应该为 Riak CS 中的独立 HA Apache Flink 集群的保存/恢复状态配置的全部内容.

I want to configure filesystem state backend and zookeeper recovery mode:

state.backend: filesystem
state.backend.fs.checkpointdir: ???

recovery.mode: zookeeper
recovery.zookeeper.storageDir: ???

As you can see I should specify checkpointdir and storageDir parameters, but I don't have any file systems supported by Apache Flink (like HDFS or Amazon S3). But I have installed Riak CS cluster (seems like it compatible with S3).

So, can I use Riak CS together with Apache Flink? If it is possible: how to configure Apache Flink to work with Riak CS?

解决方案

Answer: How to join Apache Flink and Riak CS?

Riak CS has S3 (version 2) compatible interface. So, possible to use S3 file system adapter from Hadoop to work with Riak CS.

I don't known why but Apache Flink has only part of Hadoop filesystem adapters inside fat jar (lib/flink-dist_2.11-1.0.1.jar) i.e. it has FTP file system (org.apache.hadoop.fs.ftp.FTPFileSystem) but doesn't have S3 file system (i.e. org.apache.hadoop.fs.s3a.S3AFileSystem). So, you have 2 ways to solve this problem:

  • use these adapters from Hadoop installation. I didn't try this but seems like you should just configure HADOOP_CLASSPATH or HADOOP_HOME evn variable.
  • monky patch Apache Flink and download required JARs to <flink home>/lib directory

So, I choose second way because don't want to provision Hadoop in my environment. You can copy JARs from Hadoop dist or internet:

curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar

As you can see I am using old versions because such version using in Hadoop 2.7.2 and I use Flink compatible with this version of Hadoop.

FYI: Such hack can cause problems if you are using latest version of these JARs in own flow. To avoid problem related to different versions you can relocate packages when you are building fat jar with flow use something like (I am using Gradle):

// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
    dependencies {
        include(dependency('.*:.*:.*'))
    }

    relocate 'org.apache.http', 'relocated.org.apache.http'
    relocate 'org.apache.commons', 'relocated.org.apache.commons'
}

Then you should specify path to core-site.xml in flink-conf.yaml because Hadoop compatible file systems using this config for loading settings:

...
fs.hdfs.hadoopconf: /flink/conf
...

As you can see I just place it to <fink home>/conf directory. It has the following settings:

<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
    <property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
    </property>
    <property>
        <name>fs.s3a.endpoint</name>
        <value>my-riak-cs.stage.local</value>  // this is my Riak CS host
    </property>
    <property>
        <name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
        <value>false</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <value>????</value> // this is my access key for Riak CS
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <value>????</value> // this is my secret key for Riak CS
    </property>
</configuration>

Then you should configure Riak CS buckets in flink-conf.yaml as recommender here:

...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...

and create buckets in Riak CS. I am using s3cmd (installed over brew in my OS X dev env):

s3cmd mb s3://example-staging-flink

FYI: Before using s3cmd you should configure it use s3cmd --configure and then fix some settings in ~/.s3cmd file:

signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS

So, that's all what you should configure for save/restore state of Standalone HA Apache Flink cluster in Riak CS.

这篇关于是否可以将 Riak CS 与 Apache Flink 一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆