Apache Flink AWS S3 Sink 是否需要 Hadoop 进行本地测试? [英] Does Apache Flink AWS S3 Sink require Hadoop for local testing?

查看:64
本文介绍了Apache Flink AWS S3 Sink 是否需要 Hadoop 进行本地测试?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 Apache Flink 比较陌生,我正在尝试创建一个简单的项目,该项目将文件生成到 AWS S3 存储桶.根据文档,我需要安装 Hadoop 才能执行此操作.

I am relatively new to Apache Flink and I am trying to create a simple project that produces a file to an AWS S3 bucket. Based on the documentation it looks like I am required to have Hadoop installed in order to do this.

如何设置本地环境以允许我测试此功能?我已经在本地安装了 Apache Flink 和 Hadoop.我已经为 Hadoop 的 core-site.xml 配置添加了必要的更改,并将我的 HADOOP_CONF 路径添加到我的 flink.yaml 配置中.当我尝试通过 Flink UI 在本地提交我的工作时,我总是收到错误

How do I set up my local environment to allow me to test this capability? I have installed Apache Flink as well as Hadoop locally. I have added the necessary changes to the core-site.xml configuration for Hadoop and also added my HADOOP_CONF path to my flink.yaml configuration. When I try and submit my job locally through the Flink UI I always get an error

2016-12-29 16:03:49,861 INFO  org.apache.flink.util.NetUtils                                - Unable to allocate on port 6123, due to error: Address already in use
2016-12-29 16:03:49,862 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to run JobManager.
java.lang.RuntimeException: Unable to do further retries starting the actor system
    at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2203)
    at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2143)
    at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:2040)
    at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

我假设我在环境设置方面遗漏了一些东西.是否可以在本地执行此操作?任何帮助,将不胜感激.

I am assuming that I am missing something with how my environment is set up. Is it possible to do this locally? Any help would be appreciated.

推荐答案

虽然您需要 Hadoop 库,但您不必安装 Hadoop 即可在本地运行并写入 S3.我只是碰巧通过编写基于 Avro 模式的 Parquet 输出并生成特定记录到 S3 来尝试这个.我正在通过 SBT 和 Intellij Idea 在本地运行以下代码的一个版本.需要的部分:

While you need Hadoop libraries you do not have to have Hadoop installed to run locally and write to S3. I just happened to try this out with writing a Parquet output based on Avro schema and generated SpecificRecord to S3. I am running a version of the following code locally through SBT and Intellij Idea. Needed parts:

1) 使用以下文件指定所需的 Hadoop 属性(注意:不建议定义 AWS 访问密钥/秘密密钥.最好在具有适当 IAM 角色的 EC2 实例上运行以读取/写入您的 S3 存储桶.但需要本地测试)

1) Have the following file specifying the needed Hadoop properties (Note: defining the AWS access key/secret key is not recommended. Better to run on an EC2 instance that has proper IAM role to read/write to your S3 bucket. But needed for local for testing)

2) 进口:导入 com.uebercomputing.eventrecord.EventOnlyRecord

<configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> <!-- Comma separated list of local directories used to buffer large results prior to transmitting them to S3. --> <property> <name>fs.s3a.buffer.dir</name> <value>/tmp</value> </property> <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --> <property> <name>fs.s3a.access.key</name> <value>YOUR_ACCESS_KEY</value> </property> <!-- set your AWS access key --> <property> <name>fs.s3a.secret.key</name> <value>YOUR_SECRET_KEY</value> </property> </configuration>

2) Imports:
    import com.uebercomputing.eventrecord.EventOnlyRecord

3) Flink 代码使用具有上述配置的 HadoopOutputFormat:

import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.conf.{Configuration => HadoopConfiguration} import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job import org.apache.parquet.avro.AvroParquetOutputFormat

3) Flink code uses HadoopOutputFormat with above configuration:

4) 构建依赖项和使用的版本:

val events: DataSet[(Void, EventOnlyRecord)] = ... val hadoopConfig = getHadoopConfiguration(hadoopConfigFile) val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord] val outputJob = Job.getInstance //Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T] //so key is Void, value of type T - EventOnlyRecord in this case val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord]( outputFormat, outputJob ) val outputConfig = outputJob.getConfiguration outputConfig.addResource(hadoopConfig) val outputPath = new Path("s3://<bucket>/<dir-prefix>") FileOutputFormat.setOutputPath(outputJob, outputPath) AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema) events.output(hadoopOutputFormat) env.execute ... def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = { val hadoopConfig = new HadoopConfiguration() hadoopConfig.addResource(new Path(hadoodConfigPath)) hadoopConfig }

4) Build dependencies and versions used:

编辑使用 writeAsText 到 S3:

1) 创建一个 Hadoop 配置目录(将其称为 hadoop-conf-dir),其中包含一个文件 core-site.xml.

val awsSdkVersion = "1.7.4" val hadoopVersion = "2.7.3" val flinkVersion = "1.1.4" val flinkDependencies = Seq( ("org.apache.flink" %% "flink-scala" % flinkVersion), ("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion) ) val providedFlinkDependencies = flinkDependencies.map(_ % "provided") val serializationDependencies = Seq( ("org.apache.avro" % "avro" % "1.7.7"), ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"), ("org.apache.parquet" % "parquet-avro" % "1.8.1") ) val s3Dependencies = Seq( ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion), ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion) )

例如:

For example:

2) 创建一个目录(将其称为 flink-conf-dir),其中包含一个文件 flink-conf.yaml.

mkdir /home/<user>/hadoop-config cd /home/<user>/hadoop-config vi core-site.xml #content of core-site.xml <configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> <!-- Comma separated list of local directories used to buffer large results prior to transmitting them to S3. --> <property> <name>fs.s3a.buffer.dir</name> <value>/tmp</value> </property> <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --> <property> <name>fs.s3a.access.key</name> <value>YOUR_ACCESS_KEY</value> </property> <!-- set your AWS access key --> <property> <name>fs.s3a.secret.key</name> <value>YOUR_SECRET_KEY</value> </property> </configuration>

例如:

For example:

3) 编辑用于运行 S3 Flink 作业的 IntelliJ Run 配置 - 运行 - 编辑配置 - 并添加以下环境变量:

mkdir /home/<user>/flink-config cd /home/<user>/flink-config vi flink-conf.yaml //content of flink-conf.yaml - continuing earlier example fs.hdfs.hadoopconf: /home/<user>/hadoop-config

3) Edit your IntelliJ Run configuration used to run your S3 Flink job - Run - Edit configurations -  and add the following environment variable:

4) 使用该环境变量集运行代码:

FLINK_CONF_DIR and set it to your flink-conf-dir Continuing the example above: FLINK_CONF_DIR=/home/<user>/flink-config

4) Run the code with that environment variable set:
events.writeAsText("s3://<bucket>/<prefix-dir>")

env.execute

这篇关于Apache Flink AWS S3 Sink 是否需要 Hadoop 进行本地测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆