根据Flink中的模式使用GCS文件 [英] Consume GCS files based on pattern from Flink

查看:55
本文介绍了根据Flink中的模式使用GCS文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

由于Flink支持Hadoop FileSystem抽象,并且存在 GCS连接器-实现的库它位于Google Cloud Storage的顶部.

Since Flink supports the Hadoop FileSystem abstraction, and there's a GCS connector - library that implements it on top of Google Cloud Storage.

如何使用此存储库中的代码创建Flink文件源?

How do I create a Flink file source using the code in this repo?

推荐答案

要实现此目的,您需要:

To achieve this you need to:

  1. 在您的计算机上安装和配置GCS连接器 Flink群集.
  2. 添加Hadoop和Flink依赖项(包括
  1. Install and configure GCS connector on your Flink cluster.
  2. Add Hadoop and Flink dependencies (including HDFS connector) to your project:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hadoop-compatibility_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>${hadoop.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>${hadoop.version}</version>
    <scope>provided</scope>
</dependency>

  • Use it to create data source with GCS path:

    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.hadoopcompatibility.HadoopInputs;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.TextInputFormat;
    
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    DataSet<Tuple2<LongWritable, Text>> input =
        env.createInput(
            HadoopInputs.readHadoopFile(
                new TextInputFormat(), LongWritable.class, Text.class, "gs://bucket/path/some*pattern/"));
    

  • 这篇关于根据Flink中的模式使用GCS文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆