在Flink的DataStream API中以数据流的形式连续处理拼花文件 [英] Process continuously parquet files as Datastreams in Flink's DataStream API

查看:55
本文介绍了在Flink的DataStream API中以数据流的形式连续处理拼花文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在HDFS上有一个实木复合地板文件.每天都会用新的覆盖它.我的目标是-使用DataStream API在Flink Job中作为连续发出此实木复合地板文件-当它更改时.最终目标是在广播状态下使用文件内容,但这超出了此问题的范围.

I have a parquet file on HDFS. It is overwritten daily with a new one. My goal is to emit this parquet file continuously - when it changes - as a DataStream in a Flink Job using the DataStream API. The end goal is to use the file content in a Broadcast State, but this is out of scope for this question.

  1. 连续处理文件,有一个非常有用的API:
  1. To process a file continuously, there is this very useful API: Data-sources about datasources. More specifically, FileProcessingMode.PROCESS_CONTINUOUSLY: this is exactly what I need. This works for reading/monitoring text files, no problem, but not for parquet files:

// Partial version 1: the raw file is processed continuously
val path: String = "hdfs://hostname/path_to_file_dir/"
val textInputFormat: TextInputFormat = new TextInputFormat(new Path(path))
// monitor the file continuously every minute
val stream: DataStream[String] = streamExecutionEnvironment.readFile(textInputFormat, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 60000)

  1. 要处理镶木地板文件,我可以使用以下API使用 Hadoop输入格式: using-hadoop-输入格式.但是,通过此API没有FileProcessingMode参数,并且该参数仅处理一次文件:
  1. To process parquet files, I can use Hadoop Input Formats using this API: using-hadoop-inputformats. However there is no FileProcessingMode parameter via this API, and this processes the file only once:

// Partial version 2: the parquet file is only processed once
val parquetPath: String = "/path_to_file_dir/parquet_0000"
// raw text format
val hadoopInputFormat: HadoopInputFormat[Void, ArrayWritable] = HadoopInputs.readHadoopFile(new MapredParquetInputFormat(), classOf[Void], classOf[ArrayWritable], parquetPath)
val stream: DataStream[(Void, ArrayWritable)] = streamExecutionEnvironment.createInput(hadoopInputFormat).map { record =>
  // process the record here ...
}

我想以某种方式组合这两个API,以通过DataStream API连续处理Parquet文件.你们有没有尝试过类似的东西?

I would like to somehow combine the two APIs, to process continuously Parquet Files via the DataStream API. Have any of you tried something like this ?

推荐答案

浏览Flink的代码后,看起来这两个APIS相对不同,并且似乎不可能将它们合并在一起.

After browsing Flink's code, looks like that those two APIS are relatively different, and it does not seem possible to merge them together.

我将在这里详细介绍的另一种方法是定义您自己的SourceFunction,它将定期读取文件:

The other approach, which I will detail here, is to define your own SourceFunction that will periodically read the file:

class ParquetSourceFunction extends SourceFunction[Int] {
  private var isRunning = true

  override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
    while (isRunning) {
      val path = new Path("path_to_parquet_file")
      val conf = new Configuration()

      val readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER)
      val metadata = readFooter.getFileMetaData
      val schema = metadata.getSchema
      val parquetFileReader = new ParquetFileReader(conf, metadata, path, readFooter.getBlocks, schema.getColumns)
      var pages: PageReadStore = null
      try {
        while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
          val rows = pages.getRowCount
          val columnIO = new ColumnIOFactory().getColumnIO(schema)
          val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
          (0L until rows).foreach { _ =>
            val group = recordReader.read()
            val my_integer = group.getInteger("field_name", 0)
            ctx.collect(my_integer)
          }
        }
      }

      // do whatever logic suits you to stop "watching" the file
      Thread.sleep(60000)
    }
  }

  override def cancel(): Unit = isRunning = false
}

然后,使用streamExecutionEnvironment注册此源:

Then, use the streamExecutionEnvironment to register this source:

val dataStream: DataStream[Int] = streamExecutionEnvironment.addSource(new ParquetProtoSourceFunction)
// do what you want with your new datastream

这篇关于在Flink的DataStream API中以数据流的形式连续处理拼花文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆