如何处理火花结构化流中的小文件问题? [英] How to handle small file problem in spark structured streaming?

查看:27
本文介绍了如何处理火花结构化流中的小文件问题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的项目中有一个场景,我正在使用 spark-sql-2.4.1 版本读取 kafka 主题消息.我能够使用结构化流处理这一天.收到数据并进行处理后,我需要将数据保存到 hdfs 存储中的相应镶木地板文件中.

I have a scenario in my project , where I am reading the kafka topic messages using spark-sql-2.4.1 version. I am able to process the day using structured streaming. Once the data is received and after processed I need to save the data into respective parquet files in hdfs store.

我能够存储和读取镶木地板文件,我将触发时间保持在 15 秒到 1 分钟之间.这些文件非常小,因此导致文件很多.

I am able to store and read parquet files, I kept a trigger time of 15 seconds to 1 minutes. These files are very small in size hence resulting into many files.

这些 parquet 文件需要稍后通过 hive 查询读取.

These parquet files need to be read latter by hive queries.

所以1)这个策略在生产环境中有效吗?还是会导致以后出现任何小文件问题?

So 1) Is this strategy works in production environment ? or does it lead to any small file problem later ?

2) 处理/设计此类场景(即行业标准)的最佳实践是什么?

2) What are the best practices to handle/design this kind of scenario i.e. industry standard ?

3) 在生产中一般如何处理这些事情?

3) How these kind of things generally handled in Production?

谢谢.

推荐答案

我知道这个问题太老了.我有类似的问题 &我使用了 Spark 结构化流式查询侦听器来解决这个问题.

I know this question is too old. I had similar problem & I have used spark structured streaming query listeners to solve this problem.

我的用例是从 kafka & 获取数据以年、月、日和日期存储在 hdfs 中小时分区.

My use case is fetching data from kafka & storing in hdfs with year, month, day & hour partitions.

下面的代码将获取前一小时的分区数据,应用重新分区&覆盖现有分区中的数据.

Below code will take previous hour partition data, apply repartitioning & overwrite data in existing partition.

val session = SparkSession.builder().master("local[2]").enableHiveSupport().getOrCreate()
session.streams.addListener(AppListener(config,session))

class AppListener(config: Config,spark: SparkSession) extends StreamingQueryListener {
  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    this.synchronized {AppListener.mergeFiles(event.progress.timestamp,spark,config)}
  }
  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
}

object AppListener {

  def mergeFiles(currentTs: String,spark: SparkSession,config:Config):Unit = {
    val configs = config.kafka(config.key.get)
    if(currentTs.datetime.isAfter(Processed.ts.plusMinutes(5))) {

      println(
        s"""
           |Current Timestamp     :     ${currentTs}
           |Merge Files           :     ${Processed.ts.minusHours(1)}
           |
           |""".stripMargin)

      val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
      val ts = Processed.ts.minusHours(1)
      val hdfsPath = s"${configs.hdfsLocation}/year=${ts.getYear}/month=${ts.getMonthOfYear}/day=${ts.getDayOfMonth}/hour=${ts.getHourOfDay}"
      val path = new Path(hdfsPath)

      if(fs.exists(path)) {

      val hdfsFiles = fs.listLocatedStatus(path)
        .filter(lfs => lfs.isFile && !lfs.getPath.getName.contains("_SUCCESS"))
        .map(_.getPath).toList

      println(
        s"""
           |Total files in HDFS location  : ${hdfsFiles.length}
           | ${hdfsFiles.length > 1}
           |""".stripMargin)

      if(hdfsFiles.length > 1) {

          println(
            s"""
               |Merge Small Files
               |==============================================
               |HDFS Path             : ${hdfsPath}
               |Total Available files : ${hdfsFiles.length}
               |Status                : Running
               |
               |""".stripMargin)

          val df = spark.read.format(configs.writeFormat).load(hdfsPath).cache()
          df.repartition(1)
            .write
            .format(configs.writeFormat)
            .mode("overwrite")
            .save(s"/tmp${hdfsPath}")

          df.cache().unpersist()

        spark
          .read
          .format(configs.writeFormat)
          .load(s"/tmp${hdfsPath}")
          .write
          .format(configs.writeFormat)
          .mode("overwrite")
          .save(hdfsPath)

          Processed.ts = Processed.ts.plusHours(1).toDateTime("yyyy-MM-dd'T'HH:00:00")
          println(
            s"""
               |Merge Small Files
               |==============================================
               |HDFS Path             : ${hdfsPath}
               |Total files           : ${hdfsFiles.length}
               |Status                : Completed
               |
               |""".stripMargin)
        }
      }
    }
  }
  def apply(config: Config,spark: SparkSession): AppListener = new AppListener(config,spark)
}

object Processed {
  var ts: DateTime = DateTime.now(DateTimeZone.forID("UTC")).toDateTime("yyyy-MM-dd'T'HH:00:00")
}

有时数据是巨大的&我使用以下逻辑将数据分成多个文件.文件大小约为 160 MB 左右

Sometime data is huge & I have divided data into multiple files using below logic. File size will be around ~160 MB

val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
    val dataSize = bytes.toLong
    val numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt

    df.repartition(if(numPartitions == 0) 1 else numPartitions)
      .[...]

编辑-1

使用这个 - spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes 我们可以在加载到内存后获取实际 Dataframe 的大小,例如您可以检查下面的代码.

Using this - spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes we can get the size of actual Dataframe once its loaded into memory, for example you can check below code.

scala> val df = spark.read.format("orc").load("/tmp/srinivas/")
df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string ... 75 more fields]

scala> import org.apache.commons.io.FileUtils
import org.apache.commons.io.FileUtils

scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
bytes: BigInt = 763275709

scala> FileUtils.byteCountToDisplaySize(bytes.toLong)
res5: String = 727 MB

scala> import sys.process._
import sys.process._

scala> "hdfs dfs -ls -h /tmp/srinivas/".!
Found 2 items
-rw-r-----   3 svcmxns hdfs          0 2020-04-20 01:46 /tmp/srinivas/_SUCCESS
-rw-r-----   3 svcmxns hdfs    727.4 M 2020-04-20 01:46 /tmp/srinivas/part-00000-9d0b72ea-f617-4092-ae27-d36400c17917-c000.snappy.orc
res6: Int = 0

这篇关于如何处理火花结构化流中的小文件问题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆