Apache Flink AWS S3 Sink是否需要Hadoop进行本地测试? [英] Does Apache Flink AWS S3 Sink require Hadoop for local testing?

查看:75
本文介绍了Apache Flink AWS S3 Sink是否需要Hadoop进行本地测试?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对Apache Flink相对陌生,我正在尝试创建一个简单的项目,该项目将文件生成到AWS S3存储桶.根据文档,看来我需要安装Hadoop才能执行此操作.

I am relatively new to Apache Flink and I am trying to create a simple project that produces a file to an AWS S3 bucket. Based on the documentation it looks like I am required to have Hadoop installed in order to do this.

如何设置本地环境以测试该功能?我已经在本地安装了Apache Flink和Hadoop.我已经为Hadoop的core-site.xml配置添加了必要的更改,还向我的flink.yaml配置添加了HADOOP_CONF路径.当我尝试通过Flink UI在本地提交我的工作时,总是出现错误

How do I set up my local environment to allow me to test this capability? I have installed Apache Flink as well as Hadoop locally. I have added the necessary changes to the core-site.xml configuration for Hadoop and also added my HADOOP_CONF path to my flink.yaml configuration. When I try and submit my job locally through the Flink UI I always get an error

2016-12-29 16:03:49,861 INFO  org.apache.flink.util.NetUtils                                - Unable to allocate on port 6123, due to error: Address already in use
2016-12-29 16:03:49,862 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to run JobManager.
java.lang.RuntimeException: Unable to do further retries starting the actor system
    at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2203)
    at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2143)
    at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:2040)
    at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

我假设我在环境设置方面缺少一些东西.是否可以在本地执行此操作?任何帮助,将不胜感激.

I am assuming that I am missing something with how my environment is set up. Is it possible to do this locally? Any help would be appreciated.

推荐答案

虽然您需要Hadoop库,但不必安装Hadoop即可在本地运行并写入S3.我只是碰巧通过基于Avro模式编写Parquet输出并将生成的SpecificRecord生成到S3来进行尝试.我正在通过SBT和Intellij Idea在本地运行以下代码的版本.所需零件:

While you need Hadoop libraries you do not have to have Hadoop installed to run locally and write to S3. I just happened to try this out with writing a Parquet output based on Avro schema and generated SpecificRecord to S3. I am running a version of the following code locally through SBT and Intellij Idea. Needed parts:

1)使用以下文件指定所需的Hadoop属性(注意:不建议定义AWS访问密钥/秘密密钥.最好在具有适当IAM角色以读写S3存储桶的EC2实例上运行.但是需要本地进行测试)

1) Have the following file specifying the needed Hadoop properties (Note: defining the AWS access key/secret key is not recommended. Better to run on an EC2 instance that has proper IAM role to read/write to your S3 bucket. But needed for local for testing)

<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
    <property>
        <name>fs.s3a.access.key</name>
        <value>YOUR_ACCESS_KEY</value>
    </property>

    <!-- set your AWS access key -->
    <property>
        <name>fs.s3a.secret.key</name>
        <value>YOUR_SECRET_KEY</value>
    </property>
</configuration>

2)进口:导入com.uebercomputing.eventrecord.EventOnlyRecord

2) Imports: import com.uebercomputing.eventrecord.EventOnlyRecord

import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job

import org.apache.parquet.avro.AvroParquetOutputFormat

3)Flink代码使用具有上述配置的HadoopOutputFormat:

3) Flink code uses HadoopOutputFormat with above configuration:

    val events: DataSet[(Void, EventOnlyRecord)] = ...

    val hadoopConfig = getHadoopConfiguration(hadoopConfigFile)

    val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord]
    val outputJob = Job.getInstance

    //Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T]
    //so key is Void, value of type T - EventOnlyRecord in this case
    val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord](
      outputFormat,
      outputJob
    )

    val outputConfig = outputJob.getConfiguration
    outputConfig.addResource(hadoopConfig)
    val outputPath = new Path("s3://<bucket>/<dir-prefix>")
    FileOutputFormat.setOutputPath(outputJob, outputPath)
    AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema)

    events.output(hadoopOutputFormat)

    env.execute

    ...

    def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = {
      val hadoopConfig = new HadoopConfiguration()
      hadoopConfig.addResource(new Path(hadoodConfigPath))
      hadoopConfig
    }

4)构建依赖项和使用的版本:

4) Build dependencies and versions used:

    val awsSdkVersion = "1.7.4"
    val hadoopVersion = "2.7.3"
    val flinkVersion = "1.1.4"

    val flinkDependencies = Seq(
      ("org.apache.flink" %% "flink-scala" % flinkVersion),
      ("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
    )

    val providedFlinkDependencies = flinkDependencies.map(_ % "provided")

    val serializationDependencies = Seq(
      ("org.apache.avro" % "avro" % "1.7.7"),
      ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
      ("org.apache.parquet" % "parquet-avro" % "1.8.1")
    )

    val s3Dependencies = Seq(
      ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
      ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
    )

为在S3中使用writeAsText进行

1)创建一个Hadoop配置目录(将其称为hadoop-conf-dir),其中包含文件core-site.xml.

Edit for using writeAsText to S3:

1) Create a Hadoop configuration directory (will reference this as hadoop-conf-dir) with a file core-site.xml in it.

例如:

mkdir /home/<user>/hadoop-config
cd /home/<user>/hadoop-config
vi core-site.xml

#content of core-site.xml 
<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
    <property>
        <name>fs.s3a.access.key</name>
        <value>YOUR_ACCESS_KEY</value>
    </property>

    <!-- set your AWS access key -->
    <property>
        <name>fs.s3a.secret.key</name>
        <value>YOUR_SECRET_KEY</value>
    </property>
</configuration>

2)创建一个目录(将其称为flink-conf-dir),其中包含文件flink-conf.yaml.

2) Create a directory (will reference this as flink-conf-dir) with a file flink-conf.yaml in it.

例如:

mkdir /home/<user>/flink-config
cd /home/<user>/flink-config
vi flink-conf.yaml

//content of flink-conf.yaml - continuing earlier example
fs.hdfs.hadoopconf: /home/<user>/hadoop-config

3)编辑用于运行S3 Flink作业的IntelliJ Run配置-运行-编辑配置-并添加以下环境变量:

3) Edit your IntelliJ Run configuration used to run your S3 Flink job - Run - Edit configurations - and add the following environment variable:

FLINK_CONF_DIR and set it to your flink-conf-dir

Continuing the example above:
FLINK_CONF_DIR=/home/<user>/flink-config

4)运行设置了该环境变量的代码:

4) Run the code with that environment variable set:

events.writeAsText("s3://<bucket>/<prefix-dir>")

env.execute

这篇关于Apache Flink AWS S3 Sink是否需要Hadoop进行本地测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆