Apache Spark 中何时发生改组? [英] When does shuffling occur in Apache Spark?

查看:29
本文介绍了Apache Spark 中何时发生改组?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在优化 Spark 中的参数,并且想知道 Spark 是如何对数据进行混洗的.

I am optimizing parameters in Spark, and would like to know exactly how Spark is shuffling data.

确切地说,我有一个简单的字数统计程序,想知道 spark.shuffle.file.buffer.kb 是如何影响运行时间的.现在,当我将此参数设置得非常高时,我只会看到速度变慢(我猜这会阻止每个任务的缓冲区同时适应内存).

Precisely, I have a simple word count program, and would like to know how spark.shuffle.file.buffer.kb is affecting the run time. Right now, I only see slowdown when I make this parameter very high (I am guessing this prevents every task's buffer from fitting in memory simultaneously).

有人能解释一下 Spark 是如何进行缩减的吗?例如,数据在 RDD 中被读取和分区,当调用action"函数时,Spark 会向工作节点发送任务.如果 action 是 reduce,Spark 是如何处理的,shuffle files/buffers 是如何与这个过程相关的?

Could someone explain how Spark is performing reductions? For example, the data is read and partitioned in an RDD, and when an "action" function is called, Spark sends out tasks to the worker nodes. If the action is a reduction, how does Spark handle this, and how are shuffle files / buffers related to this process?

推荐答案

问题:关于Spark何时触发洗牌?

答案: 任何 joincogroupByKey 操作都涉及将对象保存在 hashmap 或内存中用于分组或排序的缓冲区.joincogroupgroupByKey 将这些数据结构用于它们触发的 shuffle 的获取端的阶段的任务中.reduceByKeyaggregateByKey 在任务中使用数据结构,用于它们触发的 shuffle 两侧的阶段.

Answer : Any join, cogroup, or ByKey operation involves holding objects in hashmaps or in-memory buffers to group or sort. join, cogroup, and groupByKey use these data structures in the tasks for the stages that are on the fetching side of the shuffles they trigger. reduceByKey and aggregateByKey use data structures in the tasks for the stages on both sides of the shuffles they trigger.

解释:shuffle 操作在 Spark 中是如何工作的?

与 Hadoop 相比,Spark 中的 shuffle 操作实现方式不同.我不知道你是否熟悉它如何与 Hadoop 一起工作,但现在让我们专注于 Spark.

The shuffle operation is implemented differently in Spark compared to Hadoop. I don't know if you are familiar with how it works with Hadoop but let's focus on Spark for now.

ma​​p 方面,Spark 中的每个 map 任务为每个 reducer 写出一个 shuffle 文件(操作系统磁盘缓冲区)——它对应于 Spark 中的一个逻辑块.这些文件不是中间文件,因为 Spark 不会将它们合并到更大的分区文件中.由于 Spark 中的调度开销较小,因此映射器 (M) 和缩减器 (R) 的数量远高于 Hadoop.因此,将 M*R 文件传送到各自的 reducer 可能会导致大量开销.

On the map side, each map task in Spark writes out a shuffle file (os disk buffer) for every reducer – which corresponds to a logical block in Spark. These files are not intermediary in the sense that Spark does not merge them into larger partitioned ones. Since scheduling overhead in Spark is lesser, the number of mappers (M) and reducers(R) is far higher than in Hadoop. Thus, shipping M*R files to the respective reducers could result in significant overheads.

与 Hadoop 类似,Spark 也提供了一个参数 spark.shuffle.compress 来指定压缩库来压缩地图输出.在这种情况下,它可以是 Snappy(默认)或 LZF.Snappy 每个打开的文件仅使用 33KB 的缓冲区,显着降低了遇到内存不足错误的风险.

Similar to Hadoop, Spark also provide a parameter spark.shuffle.compress to specify compression libraries to compress map outputs. In this case, it could be Snappy (by default) or LZF. Snappy uses only 33KB of buffer for each opened file and significantly reduces risk of encountering out-of-memory errors.

reduce 方面,Spark 要求所有 shuffle 的数据适合相应的 reducer 任务的内存,而 Hadoop 则可以选择将其溢出到磁盘.这当然只会发生在 reducer 任务需要所有 shuffle 数据以进行 GroupByKeyReduceByKey 操作的情况下.在这种情况下,Spark 会抛出内存不足异常,这对开发人员来说是一个相当大的挑战.

On the reduce side, Spark requires all shuffled data to fit into memory of the corresponding reducer task, on the contrary of Hadoop that had an option to spill this over to disk. This would of course happen only in cases where the reducer task demands all shuffled data for a GroupByKey or a ReduceByKey operation, for instance. Spark throws an out-of-memory exception in this case, which has proved quite a challenge for developers so far.

Spark 也没有重叠复制阶段,不像 Hadoop 有重叠复制阶段,其中映射器甚至在映射完成之前将数据推送到减速器.这意味着 shuffle 在 Spark 中是 pull 操作,而在 Hadoop 中是 push 操作.每个 reducer 还应该维护一个网络缓冲区来获取 map 输出.该缓冲区的大小通过参数spark.reducer.maxMbInFlight指定(默认为48MB).

Also with Spark there is no overlapping copy phase, unlike Hadoop that has an overlapping copy phase where mappers push data to the reducers even before map is complete. This means that the shuffle is a pull operation in Spark, compared to a push operation in Hadoop. Each reducer should also maintain a network buffer to fetch map outputs. Size of this buffer is specified through the parameter spark.reducer.maxMbInFlight (by default, it is 48MB).

有关 Apache Spark 中的 shuffle 的更多信息,我建议阅读以下内容:

For more information about shuffling in Apache Spark, I suggest the following readings :

  • Optimizing Shuffle Performance in Spark by Aaron Davidson and Andrew Or.
  • SPARK-751 JIRA issue and Consolidating Shuffle files by Jason Dai.

这篇关于Apache Spark 中何时发生改组?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆