Spark RDD:如何最有效地计算统计信息? [英] Spark RDD: How to calculate statistics most efficiently?

查看:191
本文介绍了Spark RDD:如何最有效地计算统计信息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假定存在类似于以下内容的元组的RDD:

Assuming the existence of an RDD of tuples similar to the following:

(key1, 1)
(key3, 9)
(key2, 3)
(key1, 4)
(key1, 5)
(key3, 2)
(key2, 7)
...

计算与每个键相对应的统计信息的最有效的方法(理想情况下是分布式的)是什么? (目前,我正特别希望计算标准差/方差.)据我了解,我的选择包括:

What is the most efficient (and, ideally, distributed) way to compute statistics corresponding to each key? (At the moment, I am looking to calculate standard deviation / variance, in particular.) As I understand it, my options amount to:

  1. 使用 ,然后 stats :可能是随机播放的, aggregateByKey ,初始化新的 StatCounter ,并使用此StackOverflow答案建议,并避免了选项2中的groupByKey.但是,我无法在PySpark中找到StatCounter的良好文档.
  1. Use the colStats function in MLLib: This approach has the advantage of easily-adaptable to use other mllib.stat functions later, if other statistical computations are deemed necessary. However, it operates on an RDD of Vector containing the data for each column, so as I understand it, this approach would require that the full set of values for each key be collected on a single node, which would seem non-ideal for large data sets. Does a Spark Vector always imply that the data in the Vector be resident locally, on a single node?
  2. Perform a groupByKey, then stats: Likely shuffle-heavy, as a result of the groupByKey operation.
  3. Perform aggregateByKey, initializing a new StatCounter, and using StatCounter::merge as the sequence and combiner functions: This is the approach recommended by this StackOverflow answer, and avoids the groupByKey from option 2. However, I haven't been able to find good documentation for StatCounter in PySpark.

我喜欢选项1,因为它使代码更具扩展性,因为它可以使用具有类似合同的其他MLLib函数轻松容纳更复杂的计算,但是如果Vector输入固有地要求在本地收集数据集,则它限制了代码可以有效运行的数据大小.在其他两个选项之间,选项3 看起来更有效率,因为它避免了groupByKey,但是我希望确认是这种情况.

I like Option 1 because it makes the code more extensible, in that it could easily accommodate more complicated calculations using other MLLib functions with similar contracts, but if the Vector inputs inherently require that the data sets be collected locally, then it limits the data sizes on which the code can effectively operate. Between the other two, Option 3 looks more efficient because it avoids the groupByKey, but I was hoping to confirm that that is the case.

我还没有考虑过其他选择吗? (我目前正在使用Python + PySpark,但是如果存在语言差异,我也可以使用Java/Scala解决方案.)

Are there any other options I haven't considered? (I am currently using Python + PySpark, but I'm open to solutions in Java/Scala as well, if there is a language difference.)

推荐答案

您可以尝试reduceByKey.如果我们只想计算min():

You can try reduceByKey. It's pretty straightforward if we only want to compute the min():

rdd.reduceByKey(lambda x,y: min(x,y)).collect()
#Out[84]: [('key3', 2.0), ('key2', 3.0), ('key1', 1.0)]

要计算mean,首先需要创建(value, 1)元组,在reduceByKey操作中,我们将使用它们来计算sumcount.最后,我们将它们彼此分开以得出mean:

To calculate the mean, you'll first need to create (value, 1) tuples which we use to calculate both the sum and count in the reduceByKey operation. Lastly we divide them by each other to arrive at the mean:

meanRDD = (rdd
           .mapValues(lambda x: (x, 1))
           .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
           .mapValues(lambda x: x[0]/x[1]))

meanRDD.collect()
#Out[85]: [('key3', 5.5), ('key2', 5.0), ('key1', 3.3333333333333335)]

对于variance,您可以使用公式(sumOfSquares/count) - (sum/count)^2, 我们通过以下方式进行翻译:

For the variance, you can use the formula (sumOfSquares/count) - (sum/count)^2, which we translate in the following way:

varRDD = (rdd
          .mapValues(lambda x: (1, x, x*x))
          .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2]+y[2]))
          .mapValues(lambda x: (x[2]/x[0] - (x[1]/x[0])**2)))

varRDD.collect()
#Out[106]: [('key3', 12.25), ('key2', 4.0), ('key1', 2.8888888888888875)]

我在虚拟数据中使用了类型double而不是int的值来准确地说明计算平均值和方差:

I used values of type double instead of int in the dummy data to accurately illustrate computing the average and variance:

rdd = sc.parallelize([("key1", 1.0),
                      ("key3", 9.0),
                      ("key2", 3.0),
                      ("key1", 4.0),
                      ("key1", 5.0),
                      ("key3", 2.0),
                      ("key2", 7.0)])

这篇关于Spark RDD:如何最有效地计算统计信息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆