根据 Flink 的模式使用 GCS 文件 [英] Consume GCS files based on pattern from Flink

查看:16
本文介绍了根据 Flink 的模式使用 GCS 文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

由于 Flink 支持 Hadoop 文件系统抽象,并且有一个 GCS 连接器 - 实现的库它位于 Google Cloud Storage 之上.

Since Flink supports the Hadoop FileSystem abstraction, and there's a GCS connector - library that implements it on top of Google Cloud Storage.

如何使用此 repo 中的代码创建 Flink 文件源?

How do I create a Flink file source using the code in this repo?

推荐答案

要实现这一点,您需要:

To achieve this you need to:

  1. 在您的设备上安装和配置 GCS 连接器Flink 集群.
  2. 添加 Hadoop 和 Flink 依赖项(包括 HDFS 连接器) 到您的项目:
  1. Install and configure GCS connector on your Flink cluster.
  2. Add Hadoop and Flink dependencies (including HDFS connector) to your project:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hadoop-compatibility_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>${hadoop.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>${hadoop.version}</version>
    <scope>provided</scope>
</dependency>

  • 使用它创建带有 GCS 路径的数据源:

  • Use it to create data source with GCS path:

    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.hadoopcompatibility.HadoopInputs;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.TextInputFormat;
    
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    DataSet<Tuple2<LongWritable, Text>> input =
        env.createInput(
            HadoopInputs.readHadoopFile(
                new TextInputFormat(), LongWritable.class, Text.class, "gs://bucket/path/some*pattern/"));
    

  • 这篇关于根据 Flink 的模式使用 GCS 文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆