Spark 聚合函数-aggregateByKey 是如何工作的? [英] How does Spark aggregate function - aggregateByKey work?

查看:30
本文介绍了Spark 聚合函数-aggregateByKey 是如何工作的?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我在 3 个节点上有一个分布式系统,我的数据分布在这些节点之间.例如,我有一个 test.csv 文件,它存在于所有 3 个节点上,它包含 2 列:

Say I have a distribute system on 3 nodes and my data is distributed among those nodes. for example, I have a test.csv file which exists on all 3 nodes and it contains 2 columns of:

**row   | id,  c.**
---------------
row1  | k1 , c1  
row2  | k1 , c2  
row3  | k1 , c3  
row4  | k2 , c4  
row5  | k2 , c5  
row6  | k2 , c6  
row7  | k3 , c7  
row8  | k3 , c8  
row9  | k3 , c9  
row10 | k4 , c10   
row11 | k4 , c11  
row12 | k4 , c12 

然后我使用 SparkContext.textFile 将文件读取为 rdd 等.据我了解,每个 spark 工作节点都会从文件中读取一部分.所以现在假设每个节点将存储:

Then I use SparkContext.textFile to read the file out as rdd and so. So far as I understand, each spark worker node will read the a portion out from the file. So right now let's say each node will store:

  • 节点 1:第 1~4 行
  • 节点 2:第 5~8 行
  • 节点 3:第 9~12 行

我的问题是,假设我想对这些数据进行计算,并且有一个步骤需要将键组合在一起,因此键值对将是 [k1 [{k1 c1} {k1 c2} {k1 c3}]].. 等等.

My question is that let's say I want to do computation on those data, and there is one step that I need to group the key together, so the key value pair would be [k1 [{k1 c1} {k1 c2} {k1 c3}]].. and so on.

有一个函数叫groupByKey(),使用起来非常昂贵,推荐使用aggregateByKey().所以我想知道 groupByKey()aggregateByKey() 是如何工作的?有人可以使用我上面提供的例子来解释吗?shuffle 后,行在每个节点上的位置是什么?

There is a function called groupByKey() which is very expensive to use, and aggregateByKey() is recommended to use. So I'm wondering how does groupByKey() and aggregateByKey() works under the hood? Can someone using the example I provided above to explain please? After shuffling where does the rows reside on each node?

推荐答案

aggregateByKey() 几乎与 reduceByKey() 相同(都调用 combineByKey() 在幕后),除非您为 aggregateByKey() 提供一个起始值.大多数人都熟悉reduceByKey(),所以我将在解释中使用它.

aggregateByKey() is almost identical to reduceByKey() (both calling combineByKey() behind the scenes), except you give a starting value for aggregateByKey(). Most people are familiar with reduceByKey(), so I will use that in the explanation.

reduceByKey() 之所以好得多,是因为它使用了一种称为组合器的 MapReduce 功能.任何像 +* 这样的函数都可以以这种方式使用,因为调用它的元素的顺序无关紧要.这允许 Spark 开始减少"具有相同键的值,即使它们尚未全部位于同一分区中.

The reason reduceByKey() is so much better is because it makes use of a MapReduce feature called a combiner. Any function like + or * can be used in this fashion because the order of the elements it is called on doesn't matter. This allows Spark to start "reducing" values with the same key even if they are not all in the same partition yet.

另一方面,groupByKey() 为您提供了更多功能,因为您编写了一个接受 Iterable 的函数,这意味着您甚至可以将所有元素拉入一个数组.然而,它是低效的,因为要使其工作,完整的 (K,V,) 对必须在一个分区中.

On the flip side groupByKey() gives you more versatility since you write a function that takes an Iterable, meaning you could even pull all the elements into an array. However it is inefficient because for it to work the full set of (K,V,) pairs have to be in one partition.

在reduce类型操作上移动数据的步骤通常称为shuffle,在最简单的级别上,将数据分区到每个节点(通常使用哈希分区器),然后在每个节点上排序.

The step that moves the data around on a reduce type operation is generally called the shuffle, at the very simplest level the data is partitioned to each node (often with a hash partitioner), and then sorted on each node.

这篇关于Spark 聚合函数-aggregateByKey 是如何工作的?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆