如何处理Spark结构化流媒体中的小文件问题? [英] How to handle small file problem in spark structured streaming?

查看:87
本文介绍了如何处理Spark结构化流媒体中的小文件问题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的项目中有一个场景,其中我正在使用spark-sql-2.4.1版本阅读kafka主题消息.我可以使用结构化流处理这一天.接收到数据并对其进行处理后,我需要将数据保存到hdfs存储中的各个拼花文件中.

I have a scenario in my project , where I am reading the kafka topic messages using spark-sql-2.4.1 version. I am able to process the day using structured streaming. Once the data is received and after processed I need to save the data into respective parquet files in hdfs store.

我能够存储和读取镶木地板文件,我将触发时间保持在15秒到1分钟之间.这些文件非常小,因此会生成许多文件.

I am able to store and read parquet files, I kept a trigger time of 15 seconds to 1 minutes. These files are very small in size hence resulting into many files.

这些实木复合地板文件需要通过配置单元查询稍后读取.

These parquet files need to be read latter by hive queries.

所以1)此策略在生产环境中有效吗?还是以后会导致任何小文件问题?

So 1) Is this strategy works in production environment ? or does it lead to any small file problem later ?

2)处理/设计这种情况(即行业标准)的最佳实践是什么?

2) What are the best practices to handle/design this kind of scenario i.e. industry standard ?

3)生产中通常如何处理这类事情?

3) How these kind of things generally handled in Production?

谢谢.

推荐答案

我知道这个问题太老了.我有类似的问题我已经使用Spark结构化的流查询监听器来解决此问题.

I know this question is too old. I had similar problem & I have used spark structured streaming query listeners to solve this problem.

我的用例是从kafka&将HDFS与年,月,日&一起存储小时分区.

My use case is fetching data from kafka & storing in hdfs with year, month, day & hour partitions.

下面的代码将获取前一个小时的分区数据,并应用重新分区&覆盖现有分区中的数据.

Below code will take previous hour partition data, apply repartitioning & overwrite data in existing partition.

val session = SparkSession.builder().master("local[2]").enableHiveSupport().getOrCreate()
session.streams.addListener(AppListener(config,session))

class AppListener(config: Config,spark: SparkSession) extends StreamingQueryListener {
  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    this.synchronized {AppListener.mergeFiles(event.progress.timestamp,spark,config)}
  }
  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
}

object AppListener {

  def mergeFiles(currentTs: String,spark: SparkSession,config:Config):Unit = {
    val configs = config.kafka(config.key.get)
    if(currentTs.datetime.isAfter(Processed.ts.plusMinutes(5))) {

      println(
        s"""
           |Current Timestamp     :     ${currentTs}
           |Merge Files           :     ${Processed.ts.minusHours(1)}
           |
           |""".stripMargin)

      val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
      val ts = Processed.ts.minusHours(1)
      val hdfsPath = s"${configs.hdfsLocation}/year=${ts.getYear}/month=${ts.getMonthOfYear}/day=${ts.getDayOfMonth}/hour=${ts.getHourOfDay}"
      val path = new Path(hdfsPath)

      if(fs.exists(path)) {

      val hdfsFiles = fs.listLocatedStatus(path)
        .filter(lfs => lfs.isFile && !lfs.getPath.getName.contains("_SUCCESS"))
        .map(_.getPath).toList

      println(
        s"""
           |Total files in HDFS location  : ${hdfsFiles.length}
           | ${hdfsFiles.length > 1}
           |""".stripMargin)

      if(hdfsFiles.length > 1) {

          println(
            s"""
               |Merge Small Files
               |==============================================
               |HDFS Path             : ${hdfsPath}
               |Total Available files : ${hdfsFiles.length}
               |Status                : Running
               |
               |""".stripMargin)

          val df = spark.read.format(configs.writeFormat).load(hdfsPath).cache()
          df.repartition(1)
            .write
            .format(configs.writeFormat)
            .mode("overwrite")
            .save(s"/tmp${hdfsPath}")

          df.cache().unpersist()

        spark
          .read
          .format(configs.writeFormat)
          .load(s"/tmp${hdfsPath}")
          .write
          .format(configs.writeFormat)
          .mode("overwrite")
          .save(hdfsPath)

          Processed.ts = Processed.ts.plusHours(1).toDateTime("yyyy-MM-dd'T'HH:00:00")
          println(
            s"""
               |Merge Small Files
               |==============================================
               |HDFS Path             : ${hdfsPath}
               |Total files           : ${hdfsFiles.length}
               |Status                : Completed
               |
               |""".stripMargin)
        }
      }
    }
  }
  def apply(config: Config,spark: SparkSession): AppListener = new AppListener(config,spark)
}

object Processed {
  var ts: DateTime = DateTime.now(DateTimeZone.forID("UTC")).toDateTime("yyyy-MM-dd'T'HH:00:00")
}

有时数据量巨大,我已使用以下逻辑将数据分为多个文件.文件大小约为160 MB

Sometime data is huge & I have divided data into multiple files using below logic. File size will be around ~160 MB

val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
    val dataSize = bytes.toLong
    val numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt

    df.repartition(if(numPartitions == 0) 1 else numPartitions)
      .[...]

Edit-1

使用它-spark.sessionState.executePlan(df.queryExecution.ologic).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes我们可以将实际Dataframe的大小加载到内存中,例如,您可以检查下面的代码.

Using this - spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes we can get the size of actual Dataframe once its loaded into memory, for example you can check below code.

scala> val df = spark.read.format("orc").load("/tmp/srinivas/")
df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string ... 75 more fields]

scala> import org.apache.commons.io.FileUtils
import org.apache.commons.io.FileUtils

scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
bytes: BigInt = 763275709

scala> FileUtils.byteCountToDisplaySize(bytes.toLong)
res5: String = 727 MB

scala> import sys.process._
import sys.process._

scala> "hdfs dfs -ls -h /tmp/srinivas/".!
Found 2 items
-rw-r-----   3 svcmxns hdfs          0 2020-04-20 01:46 /tmp/srinivas/_SUCCESS
-rw-r-----   3 svcmxns hdfs    727.4 M 2020-04-20 01:46 /tmp/srinivas/part-00000-9d0b72ea-f617-4092-ae27-d36400c17917-c000.snappy.orc
res6: Int = 0

这篇关于如何处理Spark结构化流媒体中的小文件问题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆