根据Flink中的模式使用GCS文件 [英] Consume GCS files based on pattern from Flink
本文介绍了根据Flink中的模式使用GCS文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
由于Flink支持Hadoop FileSystem抽象,并且存在 GCS连接器-实现的库它位于Google Cloud Storage的顶部.
Since Flink supports the Hadoop FileSystem abstraction, and there's a GCS connector - library that implements it on top of Google Cloud Storage.
如何使用此存储库中的代码创建Flink文件源?
How do I create a Flink file source using the code in this repo?
推荐答案
要实现此目的,您需要:
To achieve this you need to:
- 在您的计算机上安装和配置GCS连接器 Flink群集.
- 添加Hadoop和Flink依赖项(包括
- Install and configure GCS connector on your Flink cluster.
- Add Hadoop and Flink dependencies (including HDFS connector) to your project:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
Use it to create data source with GCS path:
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<LongWritable, Text>> input =
env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat(), LongWritable.class, Text.class, "gs://bucket/path/some*pattern/"));
这篇关于根据Flink中的模式使用GCS文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文