Spark RDD:如何最有效地计算统计数据? [英] Spark RDD: How to calculate statistics most efficiently?

查看:35
本文介绍了Spark RDD:如何最有效地计算统计数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设存在类似于以下元组的 RDD:

Assuming the existence of an RDD of tuples similar to the following:

(key1, 1)
(key3, 9)
(key2, 3)
(key1, 4)
(key1, 5)
(key3, 2)
(key2, 7)
...

计算与每个键对应的统计信息的最有效(并且理想情况下是分布式)方法是什么?(目前,我希望计算标准偏差/方差,特别是.)据我所知,我的选择是:

What is the most efficient (and, ideally, distributed) way to compute statistics corresponding to each key? (At the moment, I am looking to calculate standard deviation / variance, in particular.) As I understand it, my options amount to:

  1. 使用 MLLib 中的colStats 函数: 这种方法的优点是很容易适应以后使用其他mllib.stat 函数,如果其他统计计算被认为是必要的.但是,它在包含每一列数据的 Vector 的 RDD 上运行,据我所知,这种方法需要在单个节点上收集每个键的完整值集,这对于大型数据集似乎不理想.Spark Vector 是否总是暗示 Vector 中的数据驻留在本地、单个节点上?
  2. 执行groupByKey,然后 stats: 可能是重排,作为 groupByKey 操作的结果.
  3. 执行aggregateByKey,初始化一个新的 StatCounter,并使用 StatCounter::merge 作为序列和组合器函数: 这是此 StackOverflow 答案推荐 的方法,并避免了选项 2 中的 groupByKey.但是,我在 PySpark 中找不到 StatCounter 的好文档.
  1. Use the colStats function in MLLib: This approach has the advantage of easily-adaptable to use other mllib.stat functions later, if other statistical computations are deemed necessary. However, it operates on an RDD of Vector containing the data for each column, so as I understand it, this approach would require that the full set of values for each key be collected on a single node, which would seem non-ideal for large data sets. Does a Spark Vector always imply that the data in the Vector be resident locally, on a single node?
  2. Perform a groupByKey, then stats: Likely shuffle-heavy, as a result of the groupByKey operation.
  3. Perform aggregateByKey, initializing a new StatCounter, and using StatCounter::merge as the sequence and combiner functions: This is the approach recommended by this StackOverflow answer, and avoids the groupByKey from option 2. However, I haven't been able to find good documentation for StatCounter in PySpark.

我喜欢选项 1,因为它使代码更具可扩展性,因为它可以使用其他具有类似契约的 MLLib 函数轻松适应更复杂的计算,但是如果 Vector 输入本身要求数据集在本地收集,然后它限制了代码可以有效运行的数据大小.在其他两个选项之间,选项 3 看起来更有效,因为它避免了 groupByKey,但我希望确认情况确实如此.

I like Option 1 because it makes the code more extensible, in that it could easily accommodate more complicated calculations using other MLLib functions with similar contracts, but if the Vector inputs inherently require that the data sets be collected locally, then it limits the data sizes on which the code can effectively operate. Between the other two, Option 3 looks more efficient because it avoids the groupByKey, but I was hoping to confirm that that is the case.

还有其他我没有考虑过的选择吗?(我目前正在使用 Python + PySpark,但如果存在语言差异,我也愿意接受 Java/Scala 中的解决方案.)

Are there any other options I haven't considered? (I am currently using Python + PySpark, but I'm open to solutions in Java/Scala as well, if there is a language difference.)

推荐答案

你可以试试reduceByKey.如果我们只想计算 min():

You can try reduceByKey. It's pretty straightforward if we only want to compute the min():

rdd.reduceByKey(lambda x,y: min(x,y)).collect()
#Out[84]: [('key3', 2.0), ('key2', 3.0), ('key1', 1.0)]

要计算 mean,您首先需要创建 (value, 1) 元组,我们用它来计算 sumcountreduceByKey 操作中.最后,我们将它们相互除以得到 mean:

To calculate the mean, you'll first need to create (value, 1) tuples which we use to calculate both the sum and count in the reduceByKey operation. Lastly we divide them by each other to arrive at the mean:

meanRDD = (rdd
           .mapValues(lambda x: (x, 1))
           .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
           .mapValues(lambda x: x[0]/x[1]))

meanRDD.collect()
#Out[85]: [('key3', 5.5), ('key2', 5.0), ('key1', 3.3333333333333335)]

对于方差,可以使用公式(sumOfSquares/count) - (sum/count)^2,我们按以下方式翻译:

For the variance, you can use the formula (sumOfSquares/count) - (sum/count)^2, which we translate in the following way:

varRDD = (rdd
          .mapValues(lambda x: (1, x, x*x))
          .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2]+y[2]))
          .mapValues(lambda x: (x[2]/x[0] - (x[1]/x[0])**2)))

varRDD.collect()
#Out[106]: [('key3', 12.25), ('key2', 4.0), ('key1', 2.8888888888888875)]

我在虚拟数据中使用了 double 类型的值而不是 int 来准确说明计算平均值和方差:

I used values of type double instead of int in the dummy data to accurately illustrate computing the average and variance:

rdd = sc.parallelize([("key1", 1.0),
                      ("key3", 9.0),
                      ("key2", 3.0),
                      ("key1", 4.0),
                      ("key1", 5.0),
                      ("key3", 2.0),
                      ("key2", 7.0)])

这篇关于Spark RDD:如何最有效地计算统计数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆