Spark在内存中执行TB文件 [英] Spark Execution of TB file in memory

查看:201
本文介绍了Spark在内存中执行TB文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

让我们假设我有一个Tb数据文件。
十个节点集群中的每个节点内存为3GB。



我想使用spark来处理文件。
但是,One TeraByte如何适应内存?

它会抛出内存异常吗?



它是如何工作的?

解决方案

正如Thilo提到的,Spark不需要加载内存中的所有内容就能够处理它。这是因为Spark会将数据划分为更小的块并分别进行操作。分区的数量,这个大小取决于几件事:


  • 存储文件的位置。 Spark最常用的选项已经将文件存储在一堆块中,而不是作为一个大块数据。例如,如果它存储在HDFS中,默认情况下这些块是64MB,并且这些块在您的节点中分布(并复制)。使用S3中存储的文件,您将获得32MB的块。这是由Hadoop FileSystem 定义的,用于读取文件并应用于Spark使用Hadoop对其进行引导的其他文件系统中的文件。

  • 你做的任何重新分区。您可以在RDD或DataFrame上调用重新分区(N) coalesce(N)来更改分区数因此它们的大小。优先使用 coalesce 来减少数据,而不是在节点之间重排数据,而使用重新分区可以指定一种新方法拆分数据,即更好地控制数据的哪些部分在同一个节点上处理。
  • 您可以进行一些更高级的转换。例如,配置设置 spark.sql.shuffle.partitions ,默认设置为200,确定有多少个分区a DataFrame


缓存



只涉及到Spark中对数据的标准处理,但我觉得你可能会因为Spark被广告为内存中而被误解,所以我想解决这个问题。默认情况下,Spark中没有比其他任何数据处理工具更内存的内容:一个简单的例子,如 sc.textFile(foo).map(mapFunc).saveTextFile(bar) code>读取一个文件(逐个块并分布在节点上),在内存中进行映射(像任何计算机程序),然后再次将其保存到存储器。在下文中,Spark对内存的使用变得更加有趣(在Scala中,我更熟悉它,但Python中的概念和方法名称完全相同):

  val rdd = sc.textFile(foo)
//执行一些预处理,比如解析行
val preprocessed = rdd.map (preprocessFunc)
//告诉Spark缓存预处理的数据(默认在内存中)
preprocessed.cache()
//执行一些映射并保存输出
preprocessed.map( mapFunc1).saveTextFile(outFile1)
//执行一个不同的映射并保存到其他地方
preprocessed.map(mapFunc2).saveTextFile(outFile2)
cache()
,所以预处理不必做两次(可能) ;默认情况下,Spark不保存任何中间结果,但会计算每个单独操作的完整链,这里的操作是 saveTextFile 调用。



我说'可能',因为实际缓存数据的能力受限于节点中的内存。 Spark为高速缓存存储预留了一定量的内存,与工作内存分开(请参阅 http://spark.apache.org/docs/latest/configuration.html#memory-management 这些内存部分的大小是如何管理的),并且只能缓存最多可容纳的数量。

取决于你的分区,它可能会更少。假设您的3个节点上都有2GB的存储内存,并且预处理的数据为6GB。如果这个数据有3个分区,它将完全适合,所有输入到 mapFunc2 的数据将从内存中加载。但是如果你有4个分区,每个1.5Gb,只有1个分区可以缓存在每个节点上;第四个分区不能放入每台机器仍留有的0.5GB,因此必须重新计算此分区以进行第二次映射,并且只有3/4的预处理数据将从内存中读取。



因此,从这个意义上讲,最好有许多小分区,以尽可能提高缓存的效率,但这可能还有其他缺点:更多的开销,如果碰巧使用Mesos细粒度模式以及大量小型输出文件(如果您在保存前不合并),则Spark会将每个分区保存为单独的文件。



正如Durga提到的那样,数据不适合内存泄露到磁盘的可能性,你可以按照他的链接:)


Let us assume i have one Tb data file. Each Node memory in ten node cluster is 3GB.

I want to process the file using spark. But how does the One TeraByte fits in memory?

Will it throw out of memory exception?

How does it work?

解决方案

As Thilo mentioned, Spark does not need to load everything in memory to be able to process it. This is because Spark will partition the data into smaller blocks and operate on these separately. The number of partitions, and this their size depends on several things:

  • Where the file is stored. The most commonly used options with Spark already store the file in a bunch of blocks rather than as a single big piece of data. If it's stored in HDFS for instance, by default these blocks are 64MB and the blocks are distributed (and replicated) across your nodes. With files stored in S3 you will get blocks of 32MB. This is defined by the Hadoop FileSystem that is used to read the files and applies to files from other filesystems for which Spark uses Hadoop to reed them.
  • Any repartitioning you do. You can call repartition(N) or coalesce(N) on an RDD or a DataFrame to change the number of partitions and thus their size. coalesce is preferred for reducing the number without shuffling data across your nodes, whereas repartition allows you to specify a new way to split up your data, i.e. have better control of which parts of the data is processed on the same node.
  • Some more advanced transformations you may do. For instance, the configuration setting spark.sql.shuffle.partitions, by default set to 200, determines how many partitions a DataFrame resulting from joins and aggregations will have

Caching

The previous bit relates to just standard processing of data in Spark, but I feel you may be let to the wrong ideas because of Spark being advertised as 'in-memory', so I wanted to address that a bit. By default there is nothing in Spark that is more 'in-memory' than any other data processing tool: a simple example as sc.textFile(foo).map(mapFunc).saveTextFile(bar) reads a file (block by block and distributed over your nodes), does the mapping in memory (like any computer program) and then saves it to storage again. Spark's use of memory becomes more interesting in the following (in Scala as I'm more fammiliar with it, but the concept and method names are exactly the same in Python):

val rdd = sc.textFile(foo)
// Do some preprocessing, such as parsing lines
val preprocessed = rdd.map(preprocessFunc)
// Tell Spark to cache preprocessed data (by default in memory)
preprocessed.cache()
// Perform some mapping and save output
preprocessed.map(mapFunc1).saveTextFile(outFile1)
// Perform a different mapping and save somewhere else
preprocessed.map(mapFunc2).saveTextFile(outFile2)

The idea here is to use cache() so preprocessing doesn't have to be done twice (possibly); by default Spark doesn't save any intermediate results, but calculates the full chain for each separate action, where the 'actions' here are the saveTextFile calls.

I said 'possibly' because the ability to actually cache data is limited by the memory in your nodes. Spark reserves a certain amount of memory for cache storage, separate from work memory (see http://spark.apache.org/docs/latest/configuration.html#memory-management how the sizes of these parts of memory are managed), and can cache only up to as much as that amount can hold.

Depending on your partitioning it may be less though. Let's say you have 2GB of storage memory on each of your 3 nodes and the data in preprocessed is 6GB. If this data has 3 partitions, it will fit perfectly and all input data to mapFunc2 will be loaded from memory. But if you have say 4 partitions, each of 1.5Gb, only 1 partition can be cached on each node; the 4th partition won't fit in the 0.5GB that is still left on each machine, so this partition has to be recalculated for the second mapping and only 3/4 of your preprocessed data will be read from memory.

So in this sense it is better to have many small partitions, to make caching as efficient as possible, but this may have other downsides: more overhead, huge delays if you happen to use Mesos with fine grained mode, and tons of small output files (if you don't coalesce before saving) as Spark saves each partition as separate file.

As Durga mentioned there is also the possibility to have data that does not fit in memory spill to disk, you can follow his link for that :)

这篇关于Spark在内存中执行TB文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆