Spark中的ReduceByKey函数 [英] ReduceByKey function In Spark

查看:121
本文介绍了Spark中的ReduceByKey函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在某处读到,对于作用于单个 RDD 的操作,例如 reduceByKey(),在预先分区的 RDD 上运行将导致在本地计算每个键的所有值在单台机器上,只需要将最终的、本地减少的值从每个工作节点发送回主节点.这意味着我必须声明一个分区器,如:

I've read somewhere that for operations that act on a single RDD, such as reduceByKey(), running on a pre-partitioned RDD will cause all the values for each key to be computed locally on a single machine, requiring only the final, locally reduced value to be sent from each worker node back to the master. Which means that I have to declare a partitioner like:

val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
             .partitionBy(new HashPartitioner(100))   // Create 100 partitions
             .persist() 

为了让 reduceByKey 像我之前解释的那样工作.

in order for reduceByKey to work as I explained previously.

我的问题是,如果我想使用 reduceByKey(最佳),我是否需要每次都声明一个分区程序,或者没有必要.

My question is, if I want to use reduceByKey (optimally), do I need to declare every time a partitioner or is it not necessary.

推荐答案

其实你说的这两种品质有些不相关.

Actually, the two qualities you are talking about are somewhat unrelated.

对于reduceByKey(),第一个质量使用提供的关联reduce函数首先在每个执行器上聚合相同键的元素,然后最终在执行器之间聚合.它封装在一个名为 mapSideCombine 的布尔参数中,如果设置为 true,则执行上述操作.如果设置为 false,就像 groupByKey() 一样,每条记录都将被打乱并发送到正确的执行器.

For reduceByKey(), the first quality aggregates elements of the same key with the provided associative reduce function locally first on each executor and then eventually aggregated across executors. It is encapsulated in a boolean parameter called mapSideCombine which if set to true does the above. If set to false, as it is with groupByKey(), each record will be shuffled and sent to the correct executor.

第二个质量涉及分区及其使用方式.每个 RDD,根据其定义,都包含一个拆分列表和(可选)一个分区器.方法 reduceByKey() 被重载并且实际上有一些定义.例如:

The second quality concerns partitioning and how it is used. Each RDD, by virtue of its definition, contains a list of splits and (optionally) a partitioner. The method reduceByKey() is overloaded and actually has a few definitions. For example:

  • def reduceByKey(func: (V, V) => V): RDD[(K, V)]

这个方法的定义实际上使用了来自父RDD的默认现有分区器,并减少了设置为默认并行级别的分区数.

This definition of the method actually uses the default existing partitioner from the parent RDD and reduces to the number of partitions set as the default parallelism level.

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

该方法的定义将使用 HashPartitioner 将数据分配给相应的执行器,分区数为 numPartitions.

This definition of the method will use a HashPartitioner to appropriate data to their corresponding executors and the number of partitions will be numPartitions.

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

最后,该方法的这个定义取代了其他两个,并采用了一个通用的(也许是自定义的)分区器,该分区器将产生由分区器如何对键进行分区决定的分区数.

Finally, this definition of the method supersedes the other two and takes in a generic (perhaps, custom) partitioner that will produce the number of partitions determined by how that partitioner partitions the keys.

关键在于您可以在 reduceByKey() 本身中实际编码所需的分区器逻辑.如果您的意图是通过预分区来避免改组开销,那也没有任何意义,因为您仍然会在预分区上改组.

The point of that is that you can actually encode your desired partitioner logic within the reduceByKey() itself. If your intention was to avoid shuffling overhead by pre-partitioning, it doesn't really make sense either since you will still be shuffling on your pre-partition.

这篇关于Spark中的ReduceByKey函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆