在 Flink 的 DataStream API 中将 Parquet 文件作为数据流连续处理 [英] Process continuously parquet files as Datastreams in Flink's DataStream API

查看:48
本文介绍了在 Flink 的 DataStream API 中将 Parquet 文件作为数据流连续处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 HDFS 上有一个镶木地板文件.它每天都会被一个新的覆盖.我的目标是连续发出这个镶木地板文件 - 当它改变时 - 作为使用 DataStream API 的 Flink 作业中的数据流.最终目标是在广播状态下使用文件内容,但这超出了本问题的范围.

I have a parquet file on HDFS. It is overwritten daily with a new one. My goal is to emit this parquet file continuously - when it changes - as a DataStream in a Flink Job using the DataStream API. The end goal is to use the file content in a Broadcast State, but this is out of scope for this question.

  1. 为了连续处理文件,有一个非常有用的 API:数据源 关于数据源.更具体地说,FileProcessingMode.PROCESS_CONTINUOUSLY:这正是我所需要的.这适用于读取/监控文本文件,没问题,但不适用于镶木地板文件:
  1. To process a file continuously, there is this very useful API: Data-sources about datasources. More specifically, FileProcessingMode.PROCESS_CONTINUOUSLY: this is exactly what I need. This works for reading/monitoring text files, no problem, but not for parquet files:

// Partial version 1: the raw file is processed continuously
val path: String = "hdfs://hostname/path_to_file_dir/"
val textInputFormat: TextInputFormat = new TextInputFormat(new Path(path))
// monitor the file continuously every minute
val stream: DataStream[String] = streamExecutionEnvironment.readFile(textInputFormat, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 60000)

  1. 要处理 parquet 文件,我可以使用以下 API 使用 Hadoop 输入格式:using-hadoop-输入格式.但是,此 API 没有 FileProcessingMode 参数,并且只处理一次文件:
  1. To process parquet files, I can use Hadoop Input Formats using this API: using-hadoop-inputformats. However there is no FileProcessingMode parameter via this API, and this processes the file only once:

// Partial version 2: the parquet file is only processed once
val parquetPath: String = "/path_to_file_dir/parquet_0000"
// raw text format
val hadoopInputFormat: HadoopInputFormat[Void, ArrayWritable] = HadoopInputs.readHadoopFile(new MapredParquetInputFormat(), classOf[Void], classOf[ArrayWritable], parquetPath)
val stream: DataStream[(Void, ArrayWritable)] = streamExecutionEnvironment.createInput(hadoopInputFormat).map { record =>
  // process the record here ...
}

我想以某种方式组合这两个 API,通过 DataStream API 连续处理 Parquet 文件.你们中有人尝试过这样的事情吗?

I would like to somehow combine the two APIs, to process continuously Parquet Files via the DataStream API. Have any of you tried something like this ?

推荐答案

浏览 Flink 的代码后,看起来这两个 API 相对不同,似乎无法将它们合并在一起.

After browsing Flink's code, looks like that those two APIS are relatively different, and it does not seem possible to merge them together.

我将在此处详细介绍的另一种方法是定义您自己的 SourceFunction,它将定期读取文件:

The other approach, which I will detail here, is to define your own SourceFunction that will periodically read the file:

class ParquetSourceFunction extends SourceFunction[Int] {
  private var isRunning = true

  override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
    while (isRunning) {
      val path = new Path("path_to_parquet_file")
      val conf = new Configuration()

      val readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER)
      val metadata = readFooter.getFileMetaData
      val schema = metadata.getSchema
      val parquetFileReader = new ParquetFileReader(conf, metadata, path, readFooter.getBlocks, schema.getColumns)
      var pages: PageReadStore = null
      try {
        while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
          val rows = pages.getRowCount
          val columnIO = new ColumnIOFactory().getColumnIO(schema)
          val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
          (0L until rows).foreach { _ =>
            val group = recordReader.read()
            val my_integer = group.getInteger("field_name", 0)
            ctx.collect(my_integer)
          }
        }
      }

      // do whatever logic suits you to stop "watching" the file
      Thread.sleep(60000)
    }
  }

  override def cancel(): Unit = isRunning = false
}

然后,使用 streamExecutionEnvironment 注册此源:

Then, use the streamExecutionEnvironment to register this source:

val dataStream: DataStream[Int] = streamExecutionEnvironment.addSource(new ParquetProtoSourceFunction)
// do what you want with your new datastream

这篇关于在 Flink 的 DataStream API 中将 Parquet 文件作为数据流连续处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆