并行读取S3中的多个文件(Spark,Java) [英] Reading multiple files from S3 in parallel (Spark, Java)

查看:221
本文介绍了并行读取S3中的多个文件(Spark,Java)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我看到了一些关于此问题的讨论,但无法理解正确的解决方案:
我想将S3中的几百个文件加载到RDD中。我现在就是这样做的:

I saw a few discussions on this but couldn't quite understand the right solution: I want to load a couple hundred files from S3 into an RDD. Here is how I'm doing it now:

ObjectListing objectListing = s3.listObjects(new ListObjectsRequest().
                withBucketName(...).
                withPrefix(...));
List<String> keys = new LinkedList<>();
objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated()

JavaRDD<String> events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps));

ReadFromS3Function 使用 AmazonS3 客户端:

    public Iterator<String> call(String s) throws Exception {
        AmazonS3 s3Client = getAmazonS3Client(properties);
        S3Object object = s3Client.getObject(new GetObjectRequest(...));
        InputStream is = object.getObjectContent();
        List<String> lines = new LinkedList<>();
        String str;
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
            if (is != null) {
                while ((str = reader.readLine()) != null) {
                    lines.add(str);
                }
            } else {
                ...
            }
        } finally {
            ...
        }
        return lines.iterator();

我从Scala中针对相同问题看到的答案中解释了这个。我认为也可以将整个路径列表传递给 sc.textFile(...),但我不确定哪种方法是最好的方法。

I kind of "translated" this from answers I saw for the same question in Scala. I think it's also possible to pass the entire list of paths to sc.textFile(...), but I'm not sure which is the best-practice way.

推荐答案

基本问题是在s3中列出对象的速度非常慢,以及它看起来像目录树的方式杀死每当有什么东西进行树木行走时(如路径的通配模式加工那样)。

the underlying problem is that listing objects in s3 is really slow, and the way it is made to look like a directory tree kills performance whenever something does a treewalk (as wildcard pattern maching of paths does).

帖子中的代码正在进行全子列表,提供更好的性能,它基本上是Hadoop 2.8和s3a listFiles(路径,递归)附带的内容a href =https://issues.apache.org/jira/browse/HADOOP-13208\"rel =noreferrer> HADOOP-13208 。

The code in the post is doing the all-children listing which delivers way better performance, it's essentially what ships with Hadoop 2.8 and s3a listFiles(path, recursive) see HADOOP-13208.

获得该列表后,您将获得对象路径的字符串,然后您可以将其映射到s3a / s3n路径,以便将spark作为文本文件输入处理,然后您可以将工作应用于

After getting that listing, you've got strings to objects paths which you can then map to s3a/s3n paths for spark to handle as text file inputs, and which you can then apply work to

val files = keys.map(key -> s"s3a://$bucket/$key").mkString(",")
sc.textFile(files).map(...)

根据要求,这里是java代码已使用。

And as requested, here's the java code used.

String prefix = "s3a://" + properties.get("s3.source.bucket") + "/";
objectListing.getObjectSummaries().forEach(summary -> keys.add(prefix+summary.getKey())); 
// repeat while objectListing truncated 
JavaRDD<String> events = sc.textFile(String.join(",", keys))

请注意,我已切换s3n到s3a,因为,只要你的CP上有 hadoop-aws amazon-sdk JAR,s3a连接器是您应该使用的连接器。它更好,它是一个由人(我)维护和测试火花工作负载的那个。请参阅 Hadoop S3连接器的历史记录

Note that I switched s3n to s3a, because, provided you have the hadoop-aws and amazon-sdk JARs on your CP, the s3a connector is the one you should be using. It's better, and its the one which gets maintained and tested against spark workloads by people (me). See The history of Hadoop's S3 connectors.

这篇关于并行读取S3中的多个文件(Spark,Java)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆