如何找到使用中位数星火 [英] How to find median using Spark

查看:2390
本文介绍了如何找到使用中位数星火的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我如何使用分布式方法找到整数RDD的中位数,IPython的,和Spark?该RDD是约700 000元,因此过大收集和发现中位数。

How can I find median of a rdd of integers using a distributed method, IPython, and Spark? The rdd is approximately 700,000 elements and therefore too large to collect and find the median.

此问题是类似这个问题。然而,问题的答案是使用Scala,我就不知道了。

This question is similar to this question. However, the answer to the question is using Scala, which I do not know.

<一个href=\"http://stackoverflow.com/questions/28158729/how-can-i-calculate-exact-median-with-apache-spark\">How我可以精确计算中位数与Apache星火?

使用的斯卡拉答案的思维,我试图用Python语言编写了类似的回答。

Using the thinking for the Scala answer, I am trying to write a similar answer in Python.

我知道我首先要排序的RDD。我不知道怎么。我看 sortBy (排序此RDD由给定keyfunc)和 sortByKey (排序此RDD,这是假设包括(键,值)对。)方法。我认为两者都使用键值和我的RDD只有整数元素。

I know I first want to sort the rdd. I do not know how. I see the sortBy (Sorts this RDD by the given keyfunc) and sortByKey (Sorts this RDD, which is assumed to consist of (key, value) pairs.) methods. I think both use key value and my RDD only has integer elements.


  1. 首先,我想这样做的 myrdd.sortBy(波长X:X)

  2. 接下来,我会发现RDD( rdd.count())的长度。

  3. 最后,我想找到的元素,或在RDD中心2个元素。我需要用这种方法帮助了。

  1. First, I was thinking of doing myrdd.sortBy(lambda x: x)?
  2. Next I will find the length of the rdd (rdd.count()).
  3. Finally, I want to find the element or 2 elements at the center of the rdd. I need help with this method too.

编辑:

我有一个想法。也许我可以索引我RDD,然后键入=指数值=元素。然后,我可以尝试按值进行排序?我不知道这是可能的,因为只有一个 sortByKey 方法。

I had an idea. Maybe I can index my rdd and then key = index and value = element. And then I can try to sort by value? I don't know if this is possible because there is only a sortByKey method.

推荐答案

您可以使用 approxQuantile 方法,它实现的格林沃尔德-卡纳算法

Spark 2.0+:

You can use approxQuantile method which implements Greenwald-Khanna algorithm:

的Python

df.approxQuantile("x", [0.5], 0.25)

斯卡拉

df.stat.approxQuantile("x", Array(0.5), 0.25)

,其中最后一个参数是一个相对误差。数字越低,该更准确的结果,更昂贵的计算

where the last parameter is a relative error. The lower the number the more accurate results and more expensive computation.

的Python

正如我在评论中提到过它很可能不值得大惊小怪。如果数据是比较小的像你的情况,然后简单地收集,并在本地计算中位数:

As I've mentioned in the comments it is most likely not worth all the fuss. If data is relatively small like in your case then simply collect and compute median locally:

import numpy as np

np.random.seed(323)
rdd = sc.parallelize(np.random.randint(1000000, size=700000))

%time np.median(rdd.collect())
np.array(rdd.collect()).nbytes

它需要大约0.01秒在我几年的旧电脑及周边内存5.5MB。

It takes around 0.01 second on my few years old computer and around 5.5MB of memory.

如果数据要大得多的排序将是一个限制因素,而不是这样得到一个确切值它可能是更好的样本,收集,并在本地计算。但是,如果你真的想使用星火这样的事情应该做的伎俩(如果我没惹任何东西):

If data is much larger sorting will be a limiting factor so instead of getting an exact value it is probably better to sample, collect, and compute locally. But if you really want a to use Spark something like this should do the trick (if I didn't mess up anything):

from numpy import floor
import time

def quantile(rdd, p, sample=None, seed=None):
    """Compute a quantile of order p ∈ [0, 1]
    :rdd a numeric rdd
    :p quantile(between 0 and 1)
    :sample fraction of and rdd to use. If not provided we use a whole dataset
    :seed random number generator seed to be used with sample
    """
    assert 0 <= p <= 1
    assert sample is None or 0 < sample <= 1

    seed = seed if seed is not None else time.time()
    rdd = rdd if sample is None else rdd.sample(False, sample, seed)

    rddSortedWithIndex = (rdd.
        sortBy(lambda x: x).
        zipWithIndex().
        map(lambda (x, i): (i, x)).
        cache())

    n = rddSortedWithIndex.count()
    h = (n - 1) * p

    rddX, rddXPlusOne = (
        rddSortedWithIndex.lookup(x)[0]
        for x in int(floor(h)) + np.array([0L, 1L]))

    return rddX + (h - floor(h)) * (rddXPlusOne - rddX)

和一些测试:

np.median(rdd.collect()), quantile(rdd, 0.5)
## (500184.5, 500184.5)
np.percentile(rdd.collect(), 25), quantile(rdd, 0.25)
## (250506.75, 250506.75)
np.percentile(rdd.collect(), 75), quantile(rdd, 0.75)
(750069.25, 750069.25)

最后,您定义位数:

Finally lets define median:

from functools import partial
median = partial(quantile, p=0.5)

到目前为止好,但它需要4.66 S IN本地模式没有任何网络通信。有可能是方法来改善这一点,但为什么甚至不屑?

So far so good but it takes 4.66 s in a local mode without any network communication. There is probably way to improve this, but why even bother?

独立语言蜂巢UDAF 的):

如果您使用 HiveContext 你也可以使用蜂巢UDAFs。随着积分值:

If you use HiveContext you can also use Hive UDAFs. With integral values:

rdd.map(lambda x: (float(x), )).toDF(["x"]).registerTempTable("df")

sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df")

随着连续的值:

sqlContext.sql("SELECT percentile(x, 0.5) FROM df")

这篇关于如何找到使用中位数星火的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆