说明使用python Spark中的聚合功能 [英] Explain the aggregate functionality in Spark using python

查看:250
本文介绍了说明使用python Spark中的聚合功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我要寻找的聚合功能是通过在Python火花提供一些更好的解释。

I am looking for some better explanation of the aggregate functionality that is available via spark in python.

我的例子如下(使用来自火花1.2.0版本pyspark)

The example I have is as follows (using pyspark from Spark 1.2.0 version)

sc.parallelize([1,2,3,4]).aggregate(
  (0, 0),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

输出:

(10, 4)

我得到预期的结果(10,4)的总和1 + 2 + 3 + 4 和4个元素。如果我改变传递给聚合函数的初始值(1,0)(0,0)我得到以下结果。

I get the expected result (10,4) which is sum of 1+2+3+4 and 4 elements. If I change the initial value passed to the aggregate function to (1,0) from (0,0) I get the following result

sc.parallelize([1,2,3,4]).aggregate(
    (1, 0),
    (lambda acc, value: (acc[0] + value, acc[1] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

输出:

(19, 4)

按9.值增加。如果我将其更改为(2,0),该值变为(28,4)等。

有人能这个值是如何计算的给我解释一下?我预期值由1上去不是9,可望见(11,4),而不是我看到(19,4)

Can someone explain to me how this value is calculated? I expected the value to go up by 1 not by 9, expected to see (11,4) instead I am seeing (19,4).

推荐答案

总让你变换和随意组合的RDD的值。

Aggregate lets you transform and combine the values of the RDD at will.

它使用两个功能:

第一个转变,并增加了原始集合[T]的元素在当地聚集[U],并采取以下形式:(U,T)=> U.你可以看到它作为一个倍,因此它也要求该操作一个零。此操作被局部施加于并联每个分区。

The first one transforms and adds the elements of the original collection [T] in a local aggregate [U] and takes the form: (U,T) => U. You can see it as a fold and therefore it also requires a zero for that operation. This operation is applied locally to each partition in parallel.

这里是问题的关键在于:应该用在这里的唯一价值是减少操作的零值。
此操作是在每个分区上本地执行,因此,添加什么该零值将增加的结果乘以RDD的分区的数量。

Here is where the key of the question lies: The only value that should be used here is the ZERO value for the reduction operation. This operation is executed locally on each partition, therefore, adding anything to that zero value will add to the result multiplied by the number of partitions of the RDD.

第二个操作需要的结果类型的previous操作[U]的2个值,并结合其在为一个值。这种操作将减少每个分区的部分结果,并产生实际总

The second operation takes 2 values of the result type of the previous operation [U] and combines it in to one value. This operation will reduce the partial results of each partition and produce the actual total.

例如:
给出一个字符串的RDD:

For example: Given an RDD of Strings:

val rdd:RDD[String] = ???

假设你想在RDD字符串长度的总和,所以你会做的:

Let's say you want to the aggregate of the length of the strings in that RDD, so you would do:

1)第一个操作将转变成字符串大小(int)和积攒大小的值。

1) The first operation will transform strings into size (int) and accumulate the values for size.

val stringSizeCummulator: (Int, String) => Int  = (total, string) => total + string.lenght`

2),用于加法运算提供零(0)

2) provide the ZERO for the addition operation (0)

val ZERO = 0

3)操作添加两个整数在一起:

3) an operation to add two integers together:

val add: (Int, Int) => Int = _ + _

全部放在一起:

rdd.aggregate(ZERO, stringSizeCummulator, add)

那么,为什么是零需要?
当cummulator函数应用于一个分区的第一要素,没有运行总计。零这里使用

So, why is the ZERO needed? When the cummulator function is applied to the first element of a partition, there's no running total. ZERO is used here.

例如。我RDD是:
- 分区1:跳,过]
- 分区2:[了,墙]

Eg. My RDD is: - Partition 1: ["Jump", "over"] - Partition 2: ["the", "wall"]

这将导致:

P1:


  1. stringSizeCummulator(ZERO,跳)= 4

  2. stringSizeCummulator(4,过)= 8

P2:


  1. stringSizeCummulator(ZERO的)= 3

  2. stringSizeCummulator(3,墙)= 7

减少:加(P1,P2)= 15

Reduce: add(P1, P2) = 15

这篇关于说明使用python Spark中的聚合功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆