Spark向/从S3写入/读取-分区大小和压缩 [英] Spark writing/reading to/from S3 - Partition Size and Compression

查看:362
本文介绍了Spark向/从S3写入/读取-分区大小和压缩的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在做一个实验,以了解哪种文件大小在s3和[EMR + Spark]下表现最佳?

I am doing an experiment to understand which file size behaves best with s3 and [EMR + Spark]

输入数据: 不可压缩的数据:文件中的随机字节 总数据大小:20GB 每个文件夹都有不同的输入文件大小: 文件大小从2MB到4GB.

Input data : Incompressible data : Random Bytes in files Total Data Size : 20GB Each folder has varying input file size : From 2MB To 4GB file size.

集群规格: 1个主节点+ 4个节点:C3.8xls -驱动程序内存5G --executor-memory 3G \ --executor-cores 2 \ --num-executors 60 \

Cluster Specifications : 1 master + 4 nodes : C3.8xls --driver-memory 5G \ --executor-memory 3G \ --executor-cores 2 \ --num-executors 60 \

代码:

scala> def time[R](block: => R): R = {
          val t0 = System.nanoTime()
          val result = block    // call-by-name
         val t1 = System.nanoTime()
          println("Elapsed time: " + (t1 - t0) + "ns")
          result
      }
time: [R](block: => R)R

scala> val inputFiles = time{sc.textFile("s3://bucket/folder/2mb-10240files-20gb/*/*")};
scala> val outputFiles = time {inputFiles.saveAsTextFile("s3://bucket/folder-out/2mb-10240files-20gb/")};

Observations =>

Observations=>

  • 2MB-32MB:大多数时间都花在打开文件句柄上[不是 高效]
  • 64MB到1GB:Spark本身正在为320个任务启动 所有这些文件大小,不再是该存储桶中文件的数量 具有20GB的数据,例如512 mb的文件中有40个文件可产生20gb的数据, 可能只有40个任务要完成,但是有320个任务 每个任务处理64MB数据.
  • 4GB文件大小:0字节输出 [无法处理内存中的数据,甚至无法拆分???]
  • 2MB - 32MB : Most of the time is spent in opening file handles [Not efficient]
  • 64MB till 1GB : Spark itself is launching 320 tasks for all these file sizes , its no longer the no of files in that bucket with 20GB data e.g. 512 mb files had 40 files to make 20gb data and could just have 40 tasks to be completed but instead there were 320 tasks each dealing with 64MB data.
  • 4GB file size : 0 Bytes outputted [Not able to handle in memory /Data not even splittable ???]

问题=>

  • 任何强制处理输入大小的默认设置为 64MB ??
  • 由于我使用的数据是随机字节,因此已经 压缩后如何进一步拆分此数据?如果可以拆分 此数据为何无法将文件大小拆分为4gb目标文件大小?
  • 为什么通过Spark上传后压缩文件的大小会增加?在输出存储区中2MB的压缩输入文件变为3.6 MB.
  • Any default setting that forces input size to be dealt with to be 64MB ??
  • Since the data I am using is random bytes and is already compressed how is it splitting this data further?If it can split this data why is it not able to split file size of 4gb object file size?
  • Why is compressed file size increased after uploading via spark?The 2MB compressed input file becomes 3.6 MB in output bucket.

推荐答案

由于未指定,我假设答案中使用的是gzip和Spark 2.2.

Since it is not specified, I'm assuming usage of gzip and Spark 2.2 in my answer.

  • 任何强制处理输入大小的默认设置为64MB?

是的,有. Spark是Hadoop项目,因此即使S3是基于对象的文件系统,也将其视为基于块的文件系统. 因此,这里的真正问题是:您正在使用S3文件系统的哪个实现(s3a,s3n)等.可以找到类似的问题

Yes, there is. Spark is a Hadoop project, and therefore treats S3 to be a block based file system even though it is an object based file system. So the real question here is: which implementation of S3 file system are you using(s3a, s3n) etc. A similar question can be found here.

  • 由于我使用的数据是随机字节,并且已经压缩,因此如何进一步拆分此数据?如果可以拆分此数据,为什么不能拆分4gb目标文件大小的文件大小?

Spark文档表示它能够读取压缩文件:

Spark docs indicate that it is capable of reading compressed files:

Spark的所有基于文件的输入法,包括textFile,都支持在目录,压缩文件和通配符上运行.例如,您可以使用textFile("/my/directory"),textFile("/my/directory/.txt")和textFile("/my/directory/.gz").

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").

这意味着您的文件非常容易阅读,并且每一行都转换为纯文本字符串.

This means that your files were read quite easily and converted to a plaintext string for each line.

但是,您正在使用压缩文件.假设它是不可拆分的格式(例如gzip),则需要整个文件进行解压缩.您正在使用3gb执行程序运行,这些执行程序可以很好地满足4mb-1gb文件的需求,但不能一次处理大于3gb的文件(考虑到开销后可能会变小).

However, you are using compressed files. Assuming it is a non-splittable format such as gzip, the entire file is needed for de-compression. You are running with 3gb executors which can satisfy the needs of 4mb-1gb files quite well, but can't handle a file larger than 3gb at once (probably lesser after accounting for overhead).

一些其他信息可以在此问题中找到.可拆分压缩类型的详细信息可以在此 answer 中找到.

Some further info can be found in this question. Details of splittable compression types can be found in this answer.

  • 为什么通过Spark上传后压缩文件的大小会增加?输出存储区中2MB的压缩输入文件变为3.6 MB.

作为上一点的推论,这意味着spark在以纯文本格式读取时已经解压缩了RDD.重新上传时,不再压缩.要进行压缩,您可以将压缩编解码器作为参数传递:

As a corollary to the previous point, this means that spark has de-compressed the RDD while reading as plaintext. While re-uploading, it is no longer compressed. To compress, you can pass a compression codec as a parameter:

sc.saveAsTextFile("s3://path", classOf[org.apache.hadoop.io.compress.GzipCodec])

还有其他可用的压缩格式.

There are other compression formats available.

这篇关于Spark向/从S3写入/读取-分区大小和压缩的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆