即使在分区数据中,Spark 也会列出所有叶节点 [英] Spark lists all leaf node even in partitioned data

查看:28
本文介绍了即使在分区数据中,Spark 也会列出所有叶节点的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有按 date 分区的镶木地板数据 &hour,文件夹结构:

events_v3-- 事件日期=2015-01-01-- event_hour=2015-01-1-- part10000.parquet.gz-- 事件日期=2015-01-02-- event_hour=5-- part10000.parquet.gz

我通过 spark 创建了一个表 raw_events 但是当我尝试查询时,它会扫描所有目录的页脚,这会减慢初始查询的速度,即使我只查询了一天的数据.

查询:select * from raw_events where event_date='2016-01-01'

类似问题:http://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%3CCAAswR-7Qbd2tdLSsO76zyw9tvs-Njw2YVd36bRfCG3DKZrH0tw@mail.gmail.com%3E/p>

日志:

应用程序>16/09/15 03:14:03 主要信息 HadoopFsRelation:在以下位置并行列出叶文件和目录:s3a://bucket/events_v3/

然后它产生 350 个任务,因为有 350 天的数据.

我已禁用 schemaMerge,并且还指定了要读取的模式,因此它可以转到我正在查看的分区,为什么要打印所有叶文件?列出 2 个执行程序的叶子文件需要 10 分钟,查询实际执行需要 20 秒

代码示例:

val sparkSession = org.apache.spark.sql.SparkSession.builder.getOrCreate()val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3a://bucket/events_v3")df.createOrReplaceTempView("temp_events")sparkSession.sql("""|select verb,count(*) from temp_events where event_date = "2016-01-01" group by verb""".stripMargin).show()

解决方案

只要给了 spark 一个目录来读取它,就会调用 listLeafFiles (org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala).这反过来调用 fs.listStatus 调用 api 来获取文件和目录列表.现在,对于每个目录,都会再次调用此方法.这会递归地发生,直到没有目录剩下.这种设计在 HDFS 系统中运行良好.但在 s3 中效果不佳,因为列表文件是一个 RPC 调用.其他的 S3 支持通过前缀获取所有文件,这正是我们所需要的.

因此,例如,如果我们有上面的目录结构,每个目录有 1 年的数据,每个目录一小时和 10 个子目录,我们将有 365 * 24 * 10 = 87k api 调用,这可以减少到 138 api 调用给定只有 137000 个文件.每个 s3 api 调用返回 1000 个文件.

代码:org/apache/hadoop/fs/s3a/S3AFileSystem.java

public FileStatus[] listStatusRecursively(Path f) 抛出 FileNotFoundException,IO异常{String key = pathToKey(f);如果(LOG.isDebugEnabled()){LOG.debug("列出路径状态:" + f);}最终列表<文件状态>结果 = 新的 ArrayList();最终 FileStatus fileStatus = getFileStatus(f);如果 (fileStatus.isDirectory()) {如果 (!key.isEmpty()) {键 = 键 + "/";}ListObjectsRequest request = new ListObjectsRequest();request.setBucketName(bucket);request.setPrefix(key);request.setMaxKeys(maxKeys);如果(LOG.isDebugEnabled()){LOG.debug("listStatus: 为目录做 listObjects " + key);}ObjectListing objects = s3.listObjects(request);统计.增量ReadOps(1);而(真){for (S3ObjectSummary 摘要:objects.getObjectSummaries()) {路径 keyPath = keyToPath(summary.getKey()).makeQualified(uri,workingDir);//跳过我们自己的键和旧的 S3N _$folder$ 文件if (keyPath.equals(f) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {如果(LOG.isDebugEnabled()){LOG.debug("忽略:" + keyPath);}继续;}如果(objectRepresentsDirectory(summary.getKey(),summary.getSize())){result.add(new S3AFileStatus(true, true, keyPath));如果(LOG.isDebugEnabled()){LOG.debug("添加:fd:" + keyPath);}} 别的 {result.add(new S3AFileStatus(summary.getSize(),dateToLong(summary.getLastModified()), keyPath,getDefaultBlockSize(f.makeQualified(uri,workingDir))));如果(LOG.isDebugEnabled()){LOG.debug("添加:fi:" + keyPath);}}}for (字符串前缀:objects.getCommonPrefixes()) {路径 keyPath = keyToPath(prefix).makeQualified(uri,workingDir);如果(keyPath.equals(f)){继续;}result.add(new S3AFileStatus(true, false, keyPath));如果(LOG.isDebugEnabled()){LOG.debug("添加:rd:" + keyPath);}}如果(objects.isTruncated()){如果(LOG.isDebugEnabled()){LOG.debug("listStatus: list truncated - 获取下一批");}对象 = s3.listNextBatchOfObjects(objects);统计.增量ReadOps(1);} 别的 {休息;}}} 别的 {如果(LOG.isDebugEnabled()){LOG.debug("添加:rd(不是目录):" + f);}结果.添加(文件状态);}return result.toArray(new FileStatus[result.size()]);}

/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala

def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = {logTrace(s"Listing ${status.getPath}")val 名称 = status.getPath.getName.toLowerCase如果 (shouldFilterOut(name)) {Array.empty[文件状态]}别的 {val 状态 = {val 统计信息 = if(fs.isInstanceOf[S3AFileSystem]){logWarning("使用列表状态的猴子补丁版本")println("使用列表状态的猴子补丁版本")val a = fs.asInstanceOf[S3AFileSystem].listStatusRecursively(status.getPath)一种//Array.empty[文件状态]}别的{val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)文件 ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter))}if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats}//状态没有任何目录.statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {情况 f:LlocatedFileStatus =>F//笔记:////- 尽管 S3/S3A/S3N 文件系统对于远程文件元数据可能会很慢//操作,调用 `getFileBlockLocations` 在这里没有坏处,因为这些文件系统//实现实际上并不为此方法发出 RPC.////- 这里我们按顺序调用`getFileBlockLocations`,但它不应该//这是一个大问题,因为我们总是使用 `listLeafFilesInParallel` 当数量//路径超过阈值.情况f=>createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))}}}

I have parquet data partitioned by date & hour, folder structure:

events_v3
  -- event_date=2015-01-01
    -- event_hour=2015-01-1
      -- part10000.parquet.gz
  -- event_date=2015-01-02
    -- event_hour=5
      -- part10000.parquet.gz

I have created a table raw_events via spark but when I try to query, it scans all the directories for footer and that slows down the initial query, even if I am querying only one day worth of data.

query: select * from raw_events where event_date='2016-01-01'

similar problem : http://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%3CCAAswR-7Qbd2tdLSsO76zyw9tvs-Njw2YVd36bRfCG3DKZrH0tw@mail.gmail.com%3E ( but its old)

Log:

App > 16/09/15 03:14:03 main INFO HadoopFsRelation: Listing leaf files and directories in parallel under: s3a://bucket/events_v3/

and then it spawns 350 tasks since there are 350 days worth of data.

I have disabled schemaMerge, and have also specified the schema to read as, so it can just go to the partition that I am looking at, why should it print all the leaf files ? Listing leaf files with 2 executors take 10 minutes, and the query actual execution takes on 20 seconds

code sample:

val sparkSession = org.apache.spark.sql.SparkSession.builder.getOrCreate()
val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3a://bucket/events_v3")
    df.createOrReplaceTempView("temp_events")
    sparkSession.sql(
      """
        |select verb,count(*) from temp_events where event_date = "2016-01-01" group by verb
      """.stripMargin).show()

解决方案

As soon as spark is given a directory to read from it issues call to listLeafFiles (org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala). This in turn calls fs.listStatus which makes an api call to get list of files and directories. Now for each directory this method is called again. This hapens recursively until no directories are left. This by design works good in a HDFS system. But works bad in s3 since list file is an RPC call. S3 on other had supports get all files by prefix, which is exactly what we need.

So for example if we had above directory structure with 1 year worth of data with each directory for hour and 10 sub directory we would have , 365 * 24 * 10 = 87k api calls, this can be reduced to 138 api calls given that there are only 137000 files. Each s3 api calls return 1000 files.

Code: org/apache/hadoop/fs/s3a/S3AFileSystem.java

public FileStatus[] listStatusRecursively(Path f) throws FileNotFoundException,
            IOException {
        String key = pathToKey(f);
        if (LOG.isDebugEnabled()) {
            LOG.debug("List status for path: " + f);
        }

        final List<FileStatus> result = new ArrayList<FileStatus>();
        final FileStatus fileStatus =  getFileStatus(f);

        if (fileStatus.isDirectory()) {
            if (!key.isEmpty()) {
                key = key + "/";
            }

            ListObjectsRequest request = new ListObjectsRequest();
            request.setBucketName(bucket);
            request.setPrefix(key);
            request.setMaxKeys(maxKeys);

            if (LOG.isDebugEnabled()) {
                LOG.debug("listStatus: doing listObjects for directory " + key);
            }

            ObjectListing objects = s3.listObjects(request);
            statistics.incrementReadOps(1);

            while (true) {
                for (S3ObjectSummary summary : objects.getObjectSummaries()) {
                    Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir);
                    // Skip over keys that are ourselves and old S3N _$folder$ files
                    if (keyPath.equals(f) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Ignoring: " + keyPath);
                        }
                        continue;
                    }

                    if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
                        result.add(new S3AFileStatus(true, true, keyPath));
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Adding: fd: " + keyPath);
                        }
                    } else {
                        result.add(new S3AFileStatus(summary.getSize(),
                                dateToLong(summary.getLastModified()), keyPath,
                                getDefaultBlockSize(f.makeQualified(uri, workingDir))));
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Adding: fi: " + keyPath);
                        }
                    }
                }

                for (String prefix : objects.getCommonPrefixes()) {
                    Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
                    if (keyPath.equals(f)) {
                        continue;
                    }
                    result.add(new S3AFileStatus(true, false, keyPath));
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Adding: rd: " + keyPath);
                    }
                }

                if (objects.isTruncated()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("listStatus: list truncated - getting next batch");
                    }

                    objects = s3.listNextBatchOfObjects(objects);
                    statistics.incrementReadOps(1);
                } else {
                    break;
                }
            }
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Adding: rd (not a dir): " + f);
            }
            result.add(fileStatus);
        }

        return result.toArray(new FileStatus[result.size()]);
    }

/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala

def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = {
    logTrace(s"Listing ${status.getPath}")
    val name = status.getPath.getName.toLowerCase
    if (shouldFilterOut(name)) {
      Array.empty[FileStatus]
    }
    else {
      val statuses = {
        val stats = if(fs.isInstanceOf[S3AFileSystem]){
          logWarning("Using Monkey patched version of list status")
          println("Using Monkey patched version of list status")
          val a = fs.asInstanceOf[S3AFileSystem].listStatusRecursively(status.getPath)
          a
//          Array.empty[FileStatus]
        }
        else{
          val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
          files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter))

        }
        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
      }
      // statuses do not have any dirs.
      statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
        case f: LocatedFileStatus => f

        // NOTE:
        //
        // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
        //   operations, calling `getFileBlockLocations` does no harm here since these file system
        //   implementations don't actually issue RPC for this method.
        //
        // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
        //   be a big deal since we always use to `listLeafFilesInParallel` when the number of
        //   paths exceeds threshold.
        case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
      }
    }
  }

这篇关于即使在分区数据中,Spark 也会列出所有叶节点的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆