如何使用Apache Spark处理数百万个较小的s3文件 [英] how to handle millions of smaller s3 files with apache spark

查看:223
本文介绍了如何使用Apache Spark处理数百万个较小的s3文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以这个问题一直困扰着我,并且开始觉得s3的火花不是完成此特定工作的正确工具.基本上,我在s3存储桶中有数百万个较小的文件.由于我不一定要进入的原因,这些文件无法合并(它们是唯一的加密成绩单).我已经看到了与此类似的问题,并且每个解决方案都没有产生良好的结果.我尝试的第一件事是通配符:

so this problem has been driving me nuts, and it is starting to feel like spark with s3 is not the right tool for this specific job. Basically, I have millions of smaller files in an s3 bucket. For reasons I can't necessarily get into, these files cannot be consolidated (one they are unique encrypted transcripts). I have seen similar questions as this one, and every single solution has not produced good results. First thing I tried was wild cards:

sc.wholeTextFiles(s3aPath + "/*/*/*/*.txt").count();

注意:计数更多的是调试文件的处理时间.这项工作几乎耗时一整天,包含10个以上实例,但仍然失败,并在列表底部显示了错误.然后,我找到了此链接,它基本上说这不是最佳链接:

Note: the count was more debugging on how long it would take to process the files. This job almost took an entire day with over 10 instances but still failed with the error posted at the bottom of the listing. I then found this link, where it basically said this isn't optimal: https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from-s3-my.html

然后,我决定尝试另一种目前无法找到的解决方案,该解决方案表示加载所有路径,然后合并所有rdds

Then, I decided to try another solution that I can't find at the moment, which said load all of the paths, then union all of the rdds

    ObjectListing objectListing = s3Client.listObjects(bucket);
    List<JavaPairRDD<String, String>> rdds = new ArrayList<>();
    List<JavaPairRDD<String, String>> tempMeta = new ArrayList<>();
    //initializes objectListing
    tempMeta.addAll(objectListing.getObjectSummaries().stream()
            .map(func)
            .filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
            .map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
            .collect(Collectors.toList()));

    while(objectListing.isTruncated()) {
        objectListing = s3Client.listNextBatchOfObjects(objectListing);
        tempMeta.addAll(objectListing.getObjectSummaries().stream()
                .map(func)
                .filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
                .map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
                .collect(Collectors.toList()));
        if (tempMeta.size() > 5000) {
            rdds.addAll(tempMeta);
            tempMeta = new ArrayList<>();
        }
    }

    if (!tempMeta.isEmpty()){
        rdds.addAll(tempMeta);
    }
    return SparkConfig.getSparkContext().union(rdds.get(0), rdds.subList(1, rdds.size()));

然后,即使我将emrfs-site设置设置为:

Then, even when I set set the emrfs-site config to:

{
    "Classification": "emrfs-site",
    "Properties": {
      "fs.s3.consistent.retryPolicyType": "fixed",
      "fs.s3.consistent.retryPeriodSeconds": "15",
      "fs.s3.consistent.retryCount": "20",
      "fs.s3.enableServerSideEncryption": "true",
      "fs.s3.consistent": "false"
    }
}

每次尝试运行作业后的6个小时内,我都会收到此错误:

I got this error within 6 hours of every time I tried running the job:

17/02/15 19:15:41 INFO AmazonHttpClient: Unable to execute HTTP request: randomBucket.s3.amazonaws.com:443 failed to respond
org.apache.http.NoHttpResponseException: randomBucket.s3.amazonaws.com:443 failed to respond

那么,首先,有没有一种方法可以将较小的文件与s3中的spark配合使用?我不在乎解决方案是否是次优的,我只想尝试使某些方法起作用.我考虑过尝试火花流,因为它的内部与加载所有文件有所不同.然后,我将使用fileStream并将newFiles设置为false.然后,我可以批处理它们.但是,这不是构建Spark Streaming的目的,所以我在走那条路线时遇到了冲突.

So first, is there a way to use smaller files with spark from s3? I don't care if the solution is suboptimal, I just want to try and get something working. I thought about trying spark streaming, since its internals are a little different with loading all of the files. I would then use fileStream and set newFiles to false. Then I could batch process them. However, that is not what spark streaming was built for, so I am conflicted in going that route.

作为旁注,我将数百万个小文件生成到hdfs中,并尝试了相同的工作,并且在一小时内完成.这让我觉得它是s3特定的.另外,我使用的是s3a,而不是普通的s3.

As a side note, I generated millions of small files into hdfs, and tried the same job, and it finished within an hour. This makes me feel like it is s3 specific. Also, I am using s3a, not the ordinary s3.

推荐答案

如果使用的是Amazon EMR,则需要使用s3://URL; s3a://用于ASF版本.

If you are using amazon EMR, then you need to use s3:// URLs; the s3a:// ones are for the ASF releases.

一个大问题是在s3中列出目录树需要多长时间,尤其是递归树遍历. Spark代码假定其具有快速的文件系统,其中列出目录和说明文件的成本较低,而实际上每个操作都需要1-4个HTTPS请求,即使在重用的HTTP/1.1连接上,这也很痛苦.太慢了,您可以看到暂停日志.

A big issue is just how long it takes to list directory trees in s3, especially that recursive tree walk. The spark code assumes its a fast filesystem where listing dirs and stating files is low cost, whereas in fact each operation takes 1-4 HTTPS requests, which, even on reused HTTP/1.1 connections, hurts. It can be so slow you can see the pauses in the log.

真正使人难受的地方是,在前部分区中会发生大量延迟,因此这是序列化工作的结果.

Where this really hurts is that it is the up front partitioning where a lot of the delay happens, so it's the serialized bit of work which is being brought to its knees.

尽管作为 S3a阶段的一部分,Hadoop 2.8中的S3a上的树遍历速度有所提高II工作//*.txt格式的通配符扫描不会获得任何提速.我的建议是尝试展平目录结构,以便从一棵深树移动到一棵浅树,甚至可能移到同一目录中,以便无需走动即可对其进行扫描,每5000个条目只有1个HTTP请求

Although there's some speedup in treewalking on S3a coming in Hadoop 2.8 as part of the S3a phase II work, wildcard scans of //*.txt form aren't going to get any speedup. My recommendation is to try to flatten your directory structure so that you move from a deep tree to something shallow, maybe even all in the same directory, so that it can be scanned without the walk, at a cost of 1 HTTP request per 5000 entries.

请记住,无论如何,许多小文件还是非常昂贵的,包括在HDFS中,它们会耗尽存储空间.有一种特殊的聚合格式,HAR文件,与tar文件类似,只是hadoop,hive和spark都可以在文件本身内部工作.尽管我没有看到任何实际的性能测试数据,但可能会帮助您.

Bear in mind that many small file are pretty expensive anyway, including in HDFS, where they use up storage. There's a special aggregate format, HAR files, which are like tar files except that hadoop, hive and spark can all work inside the file itself. That may help, though I've not seen any actual performance test figures there.

这篇关于如何使用Apache Spark处理数百万个较小的s3文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆