根据 Flink 的模式使用 GCS 文件 [英] Consume GCS files based on pattern from Flink
本文介绍了根据 Flink 的模式使用 GCS 文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
由于 Flink 支持 Hadoop 文件系统抽象,并且有一个 GCS 连接器 - 实现的库它位于 Google Cloud Storage 之上.
Since Flink supports the Hadoop FileSystem abstraction, and there's a GCS connector - library that implements it on top of Google Cloud Storage.
如何使用此 repo 中的代码创建 Flink 文件源?
How do I create a Flink file source using the code in this repo?
推荐答案
要实现这一点,您需要:
To achieve this you need to:
- 在您的设备上安装和配置 GCS 连接器Flink 集群.
- 添加 Hadoop 和 Flink 依赖项(包括 HDFS 连接器) 到您的项目:
- Install and configure GCS connector on your Flink cluster.
- Add Hadoop and Flink dependencies (including HDFS connector) to your project:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
使用它创建带有 GCS 路径的数据源:
Use it to create data source with GCS path:
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<LongWritable, Text>> input =
env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat(), LongWritable.class, Text.class, "gs://bucket/path/some*pattern/"));
这篇关于根据 Flink 的模式使用 GCS 文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文