如何使用 Spark 找到中位数和分位数 [英] How to find median and quantiles using Spark
问题描述
如何使用分布式方法、IPython 和 Spark 找到整数 RDD
的中位数?RDD
大约有 700,000 个元素,因此太大而无法收集和找到中位数.
How can I find median of an RDD
of integers using a distributed method, IPython, and Spark? The RDD
is approximately 700,000 elements and therefore too large to collect and find the median.
这个问题和这个问题类似.但是,问题的答案是使用 Scala,我不知道.
This question is similar to this question. However, the answer to the question is using Scala, which I do not know.
使用 Scala 答案的思路,我正在尝试用 Python 编写一个类似的答案.
Using the thinking for the Scala answer, I am trying to write a similar answer in Python.
我知道我首先要对 RDD
进行排序.我不知道怎么.我看到 sortBy
(按给定的 keyfunc
对这个 RDD 进行排序)和 sortByKey
(对这个 RDD
进行排序,这是假定由(键,值)对组成.)方法.我认为两者都使用键值,而我的 RDD
只有整数元素.
I know I first want to sort the RDD
. I do not know how. I see the sortBy
(Sorts this RDD by the given keyfunc
) and sortByKey
(Sorts this RDD
, which is assumed to consist of (key, value) pairs.) methods. I think both use key value and my RDD
only has integer elements.
- 首先,我在考虑做
myrdd.sortBy(lambda x: x)
? - 接下来我将找到 rdd 的长度(
rdd.count()
). - 最后,我想在 rdd 的中心找到一个或两个元素.我也需要有关此方法的帮助.
我有个主意.也许我可以索引我的 RDD
然后 key = index 和 value = element.然后我可以尝试按值排序?我不知道这是否可行,因为只有一个 sortByKey
方法.
I had an idea. Maybe I can index my RDD
and then key = index and value = element. And then I can try to sort by value? I don't know if this is possible because there is only a sortByKey
method.
推荐答案
正在进行的工作
SPARK-30569 - 添加 DSL 函数调用 percentile_approx
您可以使用实现 approxQuantile 方法>Greenwald-Khanna 算法:
You can use approxQuantile
method which implements Greenwald-Khanna algorithm:
Python:
df.approxQuantile("x", [0.5], 0.25)
Scala:
df.stat.approxQuantile("x", Array(0.5), 0.25)
其中最后一个参数是一个相对误差.数字越小,结果越准确,计算成本越高.
where the last parameter is a relative error. The lower the number the more accurate results and more expensive computation.
从 Spark 2.2 (SPARK-14352) 开始,它支持对多列的估计:
Since Spark 2.2 (SPARK-14352) it supports estimation on multiple columns:
df.approxQuantile(["x", "y", "z"], [0.5], 0.25)
和
df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25)
底层方法也可以使用 approx_percentile
函数:
Underlying methods can be also used in SQL aggregation (both global and groped) using approx_percentile
function:
> SELECT approx_percentile(10.0, array(0.5, 0.4, 0.1), 100);
[10.0,10.0,10.0]
> SELECT approx_percentile(10.0, 0.5, 100);
10.0
火花<2.0
Python
正如我在评论中提到的,这很可能不值得大惊小怪.如果数据相对较小,例如您的情况,则只需在本地收集和计算中位数:
As I've mentioned in the comments it is most likely not worth all the fuss. If data is relatively small like in your case then simply collect and compute median locally:
import numpy as np
np.random.seed(323)
rdd = sc.parallelize(np.random.randint(1000000, size=700000))
%time np.median(rdd.collect())
np.array(rdd.collect()).nbytes
在我几年前的电脑和大约 5.5MB 的内存上大约需要 0.01 秒.
It takes around 0.01 second on my few years old computer and around 5.5MB of memory.
如果数据更大,排序将是一个限制因素,因此与其获取精确值,不如在本地进行采样、收集和计算.但是,如果你真的想使用 Spark,这样的事情应该可以解决问题(如果我没有搞砸任何事情):
If data is much larger sorting will be a limiting factor so instead of getting an exact value it is probably better to sample, collect, and compute locally. But if you really want a to use Spark something like this should do the trick (if I didn't mess up anything):
from numpy import floor
import time
def quantile(rdd, p, sample=None, seed=None):
"""Compute a quantile of order p ∈ [0, 1]
:rdd a numeric rdd
:p quantile(between 0 and 1)
:sample fraction of and rdd to use. If not provided we use a whole dataset
:seed random number generator seed to be used with sample
"""
assert 0 <= p <= 1
assert sample is None or 0 < sample <= 1
seed = seed if seed is not None else time.time()
rdd = rdd if sample is None else rdd.sample(False, sample, seed)
rddSortedWithIndex = (rdd.
sortBy(lambda x: x).
zipWithIndex().
map(lambda (x, i): (i, x)).
cache())
n = rddSortedWithIndex.count()
h = (n - 1) * p
rddX, rddXPlusOne = (
rddSortedWithIndex.lookup(x)[0]
for x in int(floor(h)) + np.array([0L, 1L]))
return rddX + (h - floor(h)) * (rddXPlusOne - rddX)
还有一些测试:
np.median(rdd.collect()), quantile(rdd, 0.5)
## (500184.5, 500184.5)
np.percentile(rdd.collect(), 25), quantile(rdd, 0.25)
## (250506.75, 250506.75)
np.percentile(rdd.collect(), 75), quantile(rdd, 0.75)
(750069.25, 750069.25)
最后让我们定义中位数:
Finally lets define median:
from functools import partial
median = partial(quantile, p=0.5)
到目前为止一切顺利,但在没有任何网络通信的本地模式下需要 4.66 秒.可能有办法改善这一点,但为什么还要麻烦呢?
So far so good but it takes 4.66 s in a local mode without any network communication. There is probably way to improve this, but why even bother?
语言独立(Hive UDAF):
如果您使用 HiveContext
,您也可以使用 Hive UDAF.使用整数值:
If you use HiveContext
you can also use Hive UDAFs. With integral values:
rdd.map(lambda x: (float(x), )).toDF(["x"]).registerTempTable("df")
sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df")
具有连续值:
sqlContext.sql("SELECT percentile(x, 0.5) FROM df")
在 percentile_approx
中,您可以传递一个额外的参数来确定要使用的记录数.
In percentile_approx
you can pass an additional argument which determines a number of records to use.
这篇关于如何使用 Spark 找到中位数和分位数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!