是否可以将Riak CS与Apache Flink一起使用? [英] Is it possible to use Riak CS with Apache Flink?

查看:65
本文介绍了是否可以将Riak CS与Apache Flink一起使用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我要配置filesystem状态后端和zookeeper恢复模式:

I want to configure filesystem state backend and zookeeper recovery mode:

state.backend: filesystem
state.backend.fs.checkpointdir: ???

recovery.mode: zookeeper
recovery.zookeeper.storageDir: ???

如您所见,我应该指定checkpointdirstorageDir参数,但是我没有Apache Flink支持的任何文件系统(例如HDFS或Amazon S3).但是我已经安装了Riak CS群集(看起来像与S3兼容).

As you can see I should specify checkpointdir and storageDir parameters, but I don't have any file systems supported by Apache Flink (like HDFS or Amazon S3). But I have installed Riak CS cluster (seems like it compatible with S3).

那么,我可以将Riak CS与Apache Flink一起使用吗?如果可能:如何配置Apache Flink与Riak CS配合使用?

So, can I use Riak CS together with Apache Flink? If it is possible: how to configure Apache Flink to work with Riak CS?

推荐答案

答案:如何加入Apache Flink和Riak CS?

Riak CS具有与S3(版本2)兼容的接口.因此,可以使用来自Hadoop的S3文件系统适配器来与Riak CS一起使用.

Riak CS has S3 (version 2) compatible interface. So, possible to use S3 file system adapter from Hadoop to work with Riak CS.

我不知道为什么,但是Apache Flink在胖罐(lib/flink-dist_2.11-1.0.1.jar)中只有一部分Hadoop文件系统适配器,即它具有FTP文件系统(org.apache.hadoop.fs.ftp.FTPFileSystem),但是没有S3文件系统(即).因此,您有2种方法可以解决此问题:

I don't known why but Apache Flink has only part of Hadoop filesystem adapters inside fat jar (lib/flink-dist_2.11-1.0.1.jar) i.e. it has FTP file system (org.apache.hadoop.fs.ftp.FTPFileSystem) but doesn't have S3 file system (i.e. org.apache.hadoop.fs.s3a.S3AFileSystem). So, you have 2 ways to solve this problem:

  • 使用Hadoop安装中的这些适配器.我没有尝试过,但是似乎您应该只配置HADOOP_CLASSPATH或HADOOP_HOME evn变量.
  • monky修补Apache Flink并将所需的JAR下载到<flink home>/lib目录

因此,我选择第二种方法是因为不想在我的环境中配置Hadoop.您可以从Hadoop dist或Internet复制JAR:

So, I choose second way because don't want to provision Hadoop in my environment. You can copy JARs from Hadoop dist or internet:

curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar

您会看到我使用的是旧版本,因为该版本在Hadoop 2.7.2中使用,并且我使用的Flink与该版本的Hadoop兼容.

As you can see I am using old versions because such version using in Hadoop 2.7.2 and I use Flink compatible with this version of Hadoop.

仅供参考:如果您在自己的流程中使用这些JAR的最新版本,这种黑客行为可能会引起问题.为了避免与不同版本相关的问题,您可以在使用流程构建胖罐时使用类似的方式(我正在使用Gradle)来重新定位软件包:

FYI: Such hack can cause problems if you are using latest version of these JARs in own flow. To avoid problem related to different versions you can relocate packages when you are building fat jar with flow use something like (I am using Gradle):

// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
    dependencies {
        include(dependency('.*:.*:.*'))
    }

    relocate 'org.apache.http', 'relocated.org.apache.http'
    relocate 'org.apache.commons', 'relocated.org.apache.commons'
}

然后,您应该在flink-conf.yaml中指​​定core-site.xml的路径,因为Hadoop兼容文件系统使用此配置来加载设置:

Then you should specify path to core-site.xml in flink-conf.yaml because Hadoop compatible file systems using this config for loading settings:

...
fs.hdfs.hadoopconf: /flink/conf
...

如您所见,我只是将其放置在<fink home>/conf目录中.它具有以下设置:

As you can see I just place it to <fink home>/conf directory. It has the following settings:

<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
    <property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
    </property>
    <property>
        <name>fs.s3a.endpoint</name>
        <value>my-riak-cs.stage.local</value>  // this is my Riak CS host
    </property>
    <property>
        <name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
        <value>false</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <value>????</value> // this is my access key for Riak CS
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <value>????</value> // this is my secret key for Riak CS
    </property>
</configuration>

然后,您应在flink-conf.yaml中将Riak CS存储桶配置为推荐程序

Then you should configure Riak CS buckets in flink-conf.yaml as recommender here:

...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...

并在Riak CS中创建存储桶.我正在使用s3cmd(安装在OS X开发环境中的brew上):

and create buckets in Riak CS. I am using s3cmd (installed over brew in my OS X dev env):

s3cmd mb s3://example-staging-flink

仅供参考:在使用s3cmd之前,应先使用s3cmd --configure对其进行配置,然后修复~/.s3cmd文件中的某些设置:

FYI: Before using s3cmd you should configure it use s3cmd --configure and then fix some settings in ~/.s3cmd file:

signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS

因此,这就是您要为Riak CS中的独立HA Apache Flink群集的保存/还原状态配置的所有内容.

So, that's all what you should configure for save/restore state of Standalone HA Apache Flink cluster in Riak CS.

这篇关于是否可以将Riak CS与Apache Flink一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆