如何使用 Spark 找到中位数和分位数 [英] How to find median and quantiles using Spark

查看:56
本文介绍了如何使用 Spark 找到中位数和分位数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何使用分布式方法、IPython 和 Spark 找到整数 RDD 的中位数?RDD 大约有 700,000 个元素,因此太大而无法收集和找到中位数.

How can I find median of an RDD of integers using a distributed method, IPython, and Spark? The RDD is approximately 700,000 elements and therefore too large to collect and find the median.

这个问题和这个问题类似.但是,问题的答案是使用 Scala,我不知道.

This question is similar to this question. However, the answer to the question is using Scala, which I do not know.

如何使用 Apache Spark 计算准确的中位数?

使用 Scala 答案的思路,我正在尝试用 Python 编写一个类似的答案.

Using the thinking for the Scala answer, I am trying to write a similar answer in Python.

我知道我首先要对 RDD 进行排序.我不知道怎么.我看到 sortBy(按给定的 keyfunc 对这个 RDD 进行排序)和 sortByKey(对这个 RDD 进行排序,这是假定由(键,值)对组成.)方法.我认为两者都使用键值,而我的 RDD 只有整数元素.

I know I first want to sort the RDD. I do not know how. I see the sortBy (Sorts this RDD by the given keyfunc) and sortByKey (Sorts this RDD, which is assumed to consist of (key, value) pairs.) methods. I think both use key value and my RDD only has integer elements.

  1. 首先,我在考虑做 myrdd.sortBy(lambda x: x)?
  2. 接下来我将找到 rdd 的长度(rdd.count()).
  3. 最后,我想在 rdd 的中心找到一个或两个元素.我也需要有关此方法的帮助.

我有个主意.也许我可以索引我的 RDD 然后 key = index 和 value = element.然后我可以尝试按值排序?我不知道这是否可行,因为只有一个 sortByKey 方法.

I had an idea. Maybe I can index my RDD and then key = index and value = element. And then I can try to sort by value? I don't know if this is possible because there is only a sortByKey method.

推荐答案

正在进行的工作

SPARK-30569 - 添加 DSL 函数调用 percentile_approx

您可以使用实现 approxQuantile 方法>Greenwald-Khanna 算法:

You can use approxQuantile method which implements Greenwald-Khanna algorithm:

Python:

df.approxQuantile("x", [0.5], 0.25)

Scala:

df.stat.approxQuantile("x", Array(0.5), 0.25)

其中最后一个参数是一个相对误差.数字越小,结果越准确,计算成本越高.

where the last parameter is a relative error. The lower the number the more accurate results and more expensive computation.

从 Spark 2.2 (SPARK-14352) 开始,它支持对多列的估计:

Since Spark 2.2 (SPARK-14352) it supports estimation on multiple columns:

df.approxQuantile(["x", "y", "z"], [0.5], 0.25)

df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25)

底层方法也可以使用 approx_percentile 函数:

Underlying methods can be also used in SQL aggregation (both global and groped) using approx_percentile function:

> SELECT approx_percentile(10.0, array(0.5, 0.4, 0.1), 100);
 [10.0,10.0,10.0]
> SELECT approx_percentile(10.0, 0.5, 100);
 10.0

火花<2.0

Python

正如我在评论中提到的,这很可能不值得大惊小怪.如果数据相对较小,例如您的情况,则只需在本地收集和计算中位数:

As I've mentioned in the comments it is most likely not worth all the fuss. If data is relatively small like in your case then simply collect and compute median locally:

import numpy as np

np.random.seed(323)
rdd = sc.parallelize(np.random.randint(1000000, size=700000))

%time np.median(rdd.collect())
np.array(rdd.collect()).nbytes

在我几年前的电脑和大约 5.5MB 的内存上大约需要 0.01 秒.

It takes around 0.01 second on my few years old computer and around 5.5MB of memory.

如果数据更大,排序将是一个限制因素,因此与其获取精确值,不如在本地进行采样、收集和计算.但是,如果你真的想使用 Spark,这样的事情应该可以解决问题(如果我没有搞砸任何事情):

If data is much larger sorting will be a limiting factor so instead of getting an exact value it is probably better to sample, collect, and compute locally. But if you really want a to use Spark something like this should do the trick (if I didn't mess up anything):

from numpy import floor
import time

def quantile(rdd, p, sample=None, seed=None):
    """Compute a quantile of order p ∈ [0, 1]
    :rdd a numeric rdd
    :p quantile(between 0 and 1)
    :sample fraction of and rdd to use. If not provided we use a whole dataset
    :seed random number generator seed to be used with sample
    """
    assert 0 <= p <= 1
    assert sample is None or 0 < sample <= 1

    seed = seed if seed is not None else time.time()
    rdd = rdd if sample is None else rdd.sample(False, sample, seed)

    rddSortedWithIndex = (rdd.
        sortBy(lambda x: x).
        zipWithIndex().
        map(lambda (x, i): (i, x)).
        cache())

    n = rddSortedWithIndex.count()
    h = (n - 1) * p

    rddX, rddXPlusOne = (
        rddSortedWithIndex.lookup(x)[0]
        for x in int(floor(h)) + np.array([0L, 1L]))

    return rddX + (h - floor(h)) * (rddXPlusOne - rddX)

还有一些测试:

np.median(rdd.collect()), quantile(rdd, 0.5)
## (500184.5, 500184.5)
np.percentile(rdd.collect(), 25), quantile(rdd, 0.25)
## (250506.75, 250506.75)
np.percentile(rdd.collect(), 75), quantile(rdd, 0.75)
(750069.25, 750069.25)

最后让我们定义中位数:

Finally lets define median:

from functools import partial
median = partial(quantile, p=0.5)

到目前为止一切顺利,但在没有任何网络通信的本地模式下需要 4.66 秒.可能有办法改善这一点,但为什么还要麻烦呢?

So far so good but it takes 4.66 s in a local mode without any network communication. There is probably way to improve this, but why even bother?

语言独立(Hive UDAF):

如果您使用 HiveContext,您也可以使用 Hive UDAF.使用整数值:

If you use HiveContext you can also use Hive UDAFs. With integral values:

rdd.map(lambda x: (float(x), )).toDF(["x"]).registerTempTable("df")

sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df")

具有连续值:

sqlContext.sql("SELECT percentile(x, 0.5) FROM df")

percentile_approx 中,您可以传递一个额外的参数来确定要使用的记录数.

In percentile_approx you can pass an additional argument which determines a number of records to use.

这篇关于如何使用 Spark 找到中位数和分位数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆