使用 Python 计算 Spark 中 Pairwise (K,V) RDD 中每个 KEY 的平均值 [英] Calculating the averages for each KEY in a Pairwise (K,V) RDD in Spark with Python

查看:28
本文介绍了使用 Python 计算 Spark 中 Pairwise (K,V) RDD 中每个 KEY 的平均值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将这个特殊的 Apache Spark 与 Python 解决方案分享,因为它的文档很差.

我想通过 KEY 计算 K/V 对(存储在 Pairwise RDD 中)的平均值.示例数据如下所示:

<预><代码>>>>rdd1.take(10) # 展示一个小样本.[(u'2013-10-09', 7.60117302052786),(u'2013-10-10', 9.322709163346612),(u'2013-10-10', 28.264462809917358),(u'2013-10-07', 9.664429530201343),(u'2013-10-07', 12.461538461538463),(u'2013-10-09', 20.76923076923077),(u'2013-10-08', 11.842105263157894),(u'2013-10-13', 32.32514177693762),(u'2013-10-13', 26.249999999999996),(u'2013-10-13', 10.693069306930692)]

现在下面的代码序列是一种不太理想的方法,但它确实有效.这是我在想出更好的解决方案之前所做的.这并不可怕,但是——正如您将在答案部分看到的那样——有一种更简洁、更有效的方法.

<预><代码>>>>进口经营者>>>countsByKey = sc.broadcast(rdd1.countByKey()) # countsByKey.value 的样本输出:{u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}>>>rdd1 = rdd1.reduceByKey(operator.add) # 计算分子(即 SUM).>>>rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # 将每个 SUM 除以它的分母(即 COUNT)>>>打印(rdd1.collect())[(u'2013-10-09', 11.235365503035176),(u'2013-10-07', 23.39500642456595),……剪……]

解决方案

现在更好的方法是使用 rdd.aggregateByKey() 方法.因为这种方法在带有 Python 文档的 Apache Spark 中的记录太少——这就是我写这个 Q&A 的原因——直到最近我一直在使用上面的代码序列.但同样,它的效率较低,因此除非必要,避免这样做.

以下是使用 rdd.aggregateByKey() 方法(推荐)执行相同操作的方法:

通过KEY,同时计算SUM(我们要计算的平均值的分子)和COUNT(我们要计算的平均值的分母):

<预><代码>>>>aTuple = (0,0) # 从 Python3 开始,您不能将文字序列传递给函数.>>>rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b, a[1] + 1),lambda a,b: (a[0] + b[0], a[1] + b[1]))

关于上面每个 ab 对的含义,以下是正确的(以便您可以想象正在发生的事情):

 分区内缩减步骤的第一个 lambda 表达式::a: 是一个包含: (runningSum, runningCount) 的元组.b: 是保存下一个值的 SCALAR跨分区缩减步骤的第二个 lambda 表达式:a: 是一个包含: (runningSum, runningCount) 的元组.b: 是一个包含: (nextPartitionsSum, nextPartitionsCount) 的 TUPLE.

最后,计算每个KEY的平均值,并收集结果.

<预><代码>>>>finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()>>>打印(最终结果)[(u'2013-09-09', 11.235365503035176),(u'2013-09-01', 23.39500642456595),(u'2013-09-03', 13.53240060820617),(u'2013-09-05', 13.141148418977687),……剪……]

我希望 aggregateByKey() 的这个问题和答案会有所帮助.

I want to share this particular Apache Spark with Python solution because documentation for it is quite poor.

I wanted to calculate the average value of K/V pairs (stored in a Pairwise RDD), by KEY. Here is what the sample data looks like:

>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]

Now the following code sequence is a less than optimal way to do it, but it does work. It is what I was doing before I figured out a better solution. It's not terrible but -- as you'll see in the answer section -- there is a more concise, efficient way.

>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]

解决方案

Now a much better way to do this is to use the rdd.aggregateByKey() method. Because this method is so poorly documented in the Apache Spark with Python documentation -- and is why I wrote this Q&A -- until recently I had been using the above code sequence. But again, it's less efficient, so avoid doing it that way unless necessary.

Here's how to do the same using the rdd.aggregateByKey() method (recommended):

By KEY, simultaneously calculate the SUM (the numerator for the average that we want to compute), and COUNT (the denominator for the average that we want to compute):

>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function.
>>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b,    a[1] + 1),
                                       lambda a,b: (a[0] + b[0], a[1] + b[1]))

Where the following is true about the meaning of each a and b pair above (so you can visualize what's happening):

   First lambda expression for Within-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a SCALAR that holds the next Value

   Second lambda expression for Cross-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).

Finally, calculate the average for each KEY, and collect results.

>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()
>>> print(finalResult)
      [(u'2013-09-09', 11.235365503035176),
       (u'2013-09-01', 23.39500642456595),
       (u'2013-09-03', 13.53240060820617),
       (u'2013-09-05', 13.141148418977687),
   ... snip ...
  ]

I hope this question and answer with aggregateByKey() will help.

这篇关于使用 Python 计算 Spark 中 Pairwise (K,V) RDD 中每个 KEY 的平均值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆