Spark中的ReduceByKey函数 [英] ReduceByKey function In Spark
问题描述
我在某处读到,对于作用于单个 RDD 的操作,例如 reduceByKey()
,在预先分区的 RDD 上运行将导致在本地计算每个键的所有值在单台机器上,只需要将最终的、本地减少的值从每个工作节点发送回主节点.这意味着我必须声明一个分区器,如:
I've read somewhere that for operations that act on a single RDD, such as reduceByKey()
, running on a pre-partitioned RDD will cause all the values for each key to be computed locally on a single machine, requiring only the final, locally reduced value to be sent from each worker node back to the master. Which means that I have to declare a partitioner like:
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
.partitionBy(new HashPartitioner(100)) // Create 100 partitions
.persist()
为了让 reduceByKey
像我之前解释的那样工作.
in order for reduceByKey
to work as I explained previously.
我的问题是,如果我想使用 reduceByKey(最佳),我是否需要每次都声明一个分区程序,或者没有必要.
My question is, if I want to use reduceByKey (optimally), do I need to declare every time a partitioner or is it not necessary.
推荐答案
其实你说的这两种品质有些不相关.
Actually, the two qualities you are talking about are somewhat unrelated.
对于reduceByKey()
,第一个质量使用提供的关联reduce函数首先在每个执行器上聚合相同键的元素,然后最终在执行器之间聚合.它封装在一个名为 mapSideCombine
的布尔参数中,如果设置为 true,则执行上述操作.如果设置为 false,就像 groupByKey()
一样,每条记录都将被打乱并发送到正确的执行器.
For reduceByKey()
, the first quality aggregates elements of the same key with the provided associative reduce function locally first on each executor and then eventually aggregated across executors. It is encapsulated in a boolean parameter called mapSideCombine
which if set to true does the above. If set to false, as it is with groupByKey()
, each record will be shuffled and sent to the correct executor.
第二个质量涉及分区及其使用方式.每个 RDD,根据其定义,都包含一个拆分列表和(可选)一个分区器.方法 reduceByKey()
被重载并且实际上有一些定义.例如:
The second quality concerns partitioning and how it is used. Each RDD, by virtue of its definition, contains a list of splits and (optionally) a partitioner. The method reduceByKey()
is overloaded and actually has a few definitions. For example:
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
这个方法的定义实际上使用了来自父RDD的默认现有分区器,并减少了设置为默认并行级别的分区数.
This definition of the method actually uses the default existing partitioner from the parent RDD and reduces to the number of partitions set as the default parallelism level.
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
该方法的定义将使用 HashPartitioner
将数据分配给相应的执行器,分区数为 numPartitions
.
This definition of the method will use a HashPartitioner
to appropriate data to their corresponding executors and the number of partitions will be numPartitions
.
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
最后,该方法的这个定义取代了其他两个,并采用了一个通用的(也许是自定义的)分区器,该分区器将产生由分区器如何对键进行分区决定的分区数.
Finally, this definition of the method supersedes the other two and takes in a generic (perhaps, custom) partitioner that will produce the number of partitions determined by how that partitioner partitions the keys.
关键在于您可以在 reduceByKey()
本身中实际编码所需的分区器逻辑.如果您的意图是通过预分区来避免改组开销,那也没有任何意义,因为您仍然会在预分区上改组.
The point of that is that you can actually encode your desired partitioner logic within the reduceByKey()
itself. If your intention was to avoid shuffling overhead by pre-partitioning, it doesn't really make sense either since you will still be shuffling on your pre-partition.
这篇关于Spark中的ReduceByKey函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!