如何使用Spark查找中位数和分位数 [英] How to find median and quantiles using Spark

查看:2293
本文介绍了如何使用Spark查找中位数和分位数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何使用分布式方法,IPython和Spark查找整数RDD的中位数? RDD大约有700,000个元素,因此太大而无法收集和找到中位数.

How can I find median of an RDD of integers using a distributed method, IPython, and Spark? The RDD is approximately 700,000 elements and therefore too large to collect and find the median.

这个问题类似于这个问题.但是,问题的答案是使用Scala,我不知道.

This question is similar to this question. However, the answer to the question is using Scala, which I do not know.

如何使用Apache Spark计算确切的中位数?

我使用Scala答案的思想,试图用Python编写类似的答案.

Using the thinking for the Scala answer, I am trying to write a similar answer in Python.

我知道我首先想对RDD进行排序.我不知道怎么.我看到了sortBy(按给定的keyfunc对RDD进行排序)和sortByKey(假设对RDD进行排序,假定它由(键,值)对组成.)方法.我认为两者都使用键值,而我的RDD仅包含整数元素.

I know I first want to sort the RDD. I do not know how. I see the sortBy (Sorts this RDD by the given keyfunc) and sortByKey (Sorts this RDD, which is assumed to consist of (key, value) pairs.) methods. I think both use key value and my RDD only has integer elements.

  1. 首先,我正在考虑做myrdd.sortBy(lambda x: x)吗?
  2. 接下来,我将找到rdd(rdd.count())的长度.
  3. 最后,我想在rdd的中心找到一个或两个元素.我也需要这种方法的帮助.
  1. First, I was thinking of doing myrdd.sortBy(lambda x: x)?
  2. Next I will find the length of the rdd (rdd.count()).
  3. Finally, I want to find the element or 2 elements at the center of the rdd. I need help with this method too.

我有个主意.也许我可以索引我的RDD,然后键=索引和值=元素.然后我可以尝试按值排序?我不知道这是否可行,因为只有sortByKey方法.

I had an idea. Maybe I can index my RDD and then key = index and value = element. And then I can try to sort by value? I don't know if this is possible because there is only a sortByKey method.

推荐答案

正在进行的工作

SPARK-30569 -添加调用percentile_approx的DSL功能

您可以使用approxQuantile方法,该方法实现 Greenwald-Khanna算法:

You can use approxQuantile method which implements Greenwald-Khanna algorithm:

Python :

df.approxQuantile("x", [0.5], 0.25)

斯卡拉:

df.stat.approxQuantile("x", Array(0.5), 0.25)

其中最后一个参数是相对错误.数字越低,结果越准确,计算量就越多.

where the last parameter is a relative error. The lower the number the more accurate results and more expensive computation.

自Spark 2.2( SPARK-14352 起),它支持对多列进行估算:

Since Spark 2.2 (SPARK-14352) it supports estimation on multiple columns:

df.approxQuantile(["x", "y", "z"], [0.5], 0.25)

df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25)

还可以使用 approx_percentile 函数:

Underlying methods can be also used in SQL aggregation (both global and groped) using approx_percentile function:

> SELECT approx_percentile(10.0, array(0.5, 0.4, 0.1), 100);
 [10.0,10.0,10.0]
> SELECT approx_percentile(10.0, 0.5, 100);
 10.0

火花< 2.0

Python

正如我在评论中提到的那样,很可能不值得大惊小怪.如果像您这样的情况数据相对较小,则只需在本地收集和计算中位数即可:

As I've mentioned in the comments it is most likely not worth all the fuss. If data is relatively small like in your case then simply collect and compute median locally:

import numpy as np

np.random.seed(323)
rdd = sc.parallelize(np.random.randint(1000000, size=700000))

%time np.median(rdd.collect())
np.array(rdd.collect()).nbytes

在我使用了几年的旧计算机上大约需要0.01秒的时间,大约需要5.5MB的内存.

It takes around 0.01 second on my few years old computer and around 5.5MB of memory.

如果数据大得多,排序将是一个限制因素,因此与其获取确切值,不如直接在本地进行采样,收集和计算.但是,如果您真的想使用Spark,可以使用以下方法解决问题(如果我没有弄乱任何东西):

If data is much larger sorting will be a limiting factor so instead of getting an exact value it is probably better to sample, collect, and compute locally. But if you really want a to use Spark something like this should do the trick (if I didn't mess up anything):

from numpy import floor
import time

def quantile(rdd, p, sample=None, seed=None):
    """Compute a quantile of order p ∈ [0, 1]
    :rdd a numeric rdd
    :p quantile(between 0 and 1)
    :sample fraction of and rdd to use. If not provided we use a whole dataset
    :seed random number generator seed to be used with sample
    """
    assert 0 <= p <= 1
    assert sample is None or 0 < sample <= 1

    seed = seed if seed is not None else time.time()
    rdd = rdd if sample is None else rdd.sample(False, sample, seed)

    rddSortedWithIndex = (rdd.
        sortBy(lambda x: x).
        zipWithIndex().
        map(lambda (x, i): (i, x)).
        cache())

    n = rddSortedWithIndex.count()
    h = (n - 1) * p

    rddX, rddXPlusOne = (
        rddSortedWithIndex.lookup(x)[0]
        for x in int(floor(h)) + np.array([0L, 1L]))

    return rddX + (h - floor(h)) * (rddXPlusOne - rddX)

和一些测试:

np.median(rdd.collect()), quantile(rdd, 0.5)
## (500184.5, 500184.5)
np.percentile(rdd.collect(), 25), quantile(rdd, 0.25)
## (250506.75, 250506.75)
np.percentile(rdd.collect(), 75), quantile(rdd, 0.75)
(750069.25, 750069.25)

最后定义中位数:

from functools import partial
median = partial(quantile, p=0.5)

到目前为止还不错,但是在本地模式下需要4.66 s,而没有任何网络通信.可能有改善的方法,但是为什么还要打扰呢?

So far so good but it takes 4.66 s in a local mode without any network communication. There is probably way to improve this, but why even bother?

与语言无关( Hive UDAF ):

如果使用HiveContext,则还可以使用Hive UDAF.带有整数值:

If you use HiveContext you can also use Hive UDAFs. With integral values:

rdd.map(lambda x: (float(x), )).toDF(["x"]).registerTempTable("df")

sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df")

具有连续值:

sqlContext.sql("SELECT percentile(x, 0.5) FROM df")

percentile_approx中,您可以传递一个附加参数,该参数确定要使用的记录数.

In percentile_approx you can pass an additional argument which determines a number of records to use.

这篇关于如何使用Spark查找中位数和分位数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆