计算每个键的平均值在成对(K,V)RDD与Python的星火 [英] Calculating the averages for each KEY in a Pairwise (K,V) RDD in Spark with Python

查看:134
本文介绍了计算每个键的平均值在成对(K,V)RDD与Python的星火的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想分享这个特殊的Apache星火与Python解决方案,因为它文档相当差。

我想计算k值/ V对的平均值(存储在成对RDD),由KEY。下面是样本数据的样子:

 >>> rdd1.take(10)#显示一个小样本。
[(u'2013-10-09',7.60117302052786)
(u'2013-10-10',9.322709163346612)
(u'2013-10-10',28.264462809917358)
(u'2013-10-07',9.664429530201343)
(u'2013-10-07',12.461538461538463)
(u'2013-10-09',20.76923076923077)
(u'2013-10-08',11.842105263157894)
(u'2013-10-13',32.32514177693762)
(u'2013-10-13',26.249999999999996)
(u'2013-10-13',10.693069306930692)

现在以下code序列是一个不到最佳办法做到这一点,但它确实工作。这是我在做什么之前,我想出了一个更好的解决方案。这并不可怕,但 - 你会回答部分看 - 有一种更简洁,高效的方式

 >>>进口经营者
>>> countsByKey = sc.broadcast(rdd1.countByKey())countsByKey.value的#示例输出:{u'2013-09-09':215,u'2013-09-08':69,...略... }
>>> RDD1集= rdd1.reduceByKey(operator.add)#计算而分子(即的款项)。
>>> RDD1集= rdd1.map(波长X:(X [0],X [1] /countsByKey.value [X [0]))#将每个SUM通过它的分母(即COUNT)
>>>打印(rdd1.collect())
  [(u'2013-10-09',11.235365503035176)
   (u'2013-10-07',23.39500642456595)
   ...略...
  ]


解决方案

现在一个更好的办法做到这一点是使用rdd.aggregateByKey()方法。因为该方法是在Apache的星火与Python文档,以便记录不完整(这就是为什么我写这篇文章),直到最近我使用上述code序列已分。但同样,它的效率较低,所以,除非你需要不这样做的办法

下面是如何使用rdd.aggregateByKey()方法做同样的(推荐)...

通过按键,同时计算SUM(分子因为我们要计算的平均值)和COUNT(因为我们要计算平均分母)。

 >>> RDD1集= rdd1.aggregateByKey((0,0),拉姆达的a,b:(1 [0] + b,则一个[1] + 1)
                                      拉姆达的a,b:(1 [0] + B [0],A [1] + B [1]))

在哪里,以下是关于每个一的意思和'b'对上述(只是让你可以想像发生了什么)真:

 第一拉姆达前$为在-分区还原步骤p $ pssion ::
   答:是持有TUPLE:(运行总和,runningCount)。
   乙:是保存下一个值SCALAR   第二拉姆达前pression跨分区的还原步骤::
   答:是持有TUPLE:(运行总和,runningCount)。
   乙:是持有TUPLE:(nextPartitionsSum,nextPartitionsCount)。

最后,计算平均每个键,并收集结果。

 >>> finalResult = rdd1.mapValues​​(拉姆达体积:体积[0] / V [1])。收集()
>>>打印(finalResult)
      [(u'2013-09-09',11.235365503035176)
       (u'2013-09-01',23.39500642456595)
       (u'2013-09-03',13.53240060820617)
       (u'2013-09-05',13.141148418977687)
   ...略...
  ]

我希望这aggregateByKey()说明会帮助别人。

I want to share this particular Apache Spark with Python solution because documentation for it is quite poor.

I wanted to calculate the average value of K/V pairs (stored in a Pairwise RDD), by KEY. Here is what the sample data looks like:

>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]

Now the following code sequence is a less than optimal way to do it, but it does work. It is what I was doing before I figured out a better solution. It's not terrible but -- as you'll see in the answer section -- there is a more concise, efficient way.

>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]

解决方案

Now a much better way to do this is to use the rdd.aggregateByKey() method. Because that method is so poorly documented in the Apache Spark with Python documentation (which is why I'm writing this), until recently I had been using the above code sequence. But again, it's less efficient, so don't do it that way unless you need to.

Here's how to do the same using the rdd.aggregateByKey() method (recommended) ...

By KEY, simultaneously calculate the SUM (numerator for the average we want to compute), and COUNT (denominator for the average we want to compute).

>>> rdd1 = rdd1.aggregateByKey((0,0), lambda a,b: (a[0] + b,    a[1] + 1),
                                      lambda a,b: (a[0] + b[0], a[1] + b[1]))

Where the following is true about the meaning of each 'a' and 'b' pair above (just so you can visualize what's happening):

   First lambda expression for Within-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a SCALAR that holds the next Value

   Second lambda expression for Cross-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).

Finally, calculate the average for each KEY, and collect results.

>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()
>>> print(finalResult)
      [(u'2013-09-09', 11.235365503035176),
       (u'2013-09-01', 23.39500642456595),
       (u'2013-09-03', 13.53240060820617),
       (u'2013-09-05', 13.141148418977687),
   ... snip ...
  ]

I hope this aggregateByKey() illustration will help others.

这篇关于计算每个键的平均值在成对(K,V)RDD与Python的星火的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆