PySpark中的随机数生成 [英] Random numbers generation in PySpark
问题描述
让我们从一个简单的函数开始,该函数总是返回一个随机整数:
Lets start with a simple function which always returns a random integer:
import numpy as np
def f(x):
return np.random.randint(1000)
和一个用零填充并使用f
映射的RDD:
and a RDD filled with zeros and mapped using f
:
rdd = sc.parallelize([0] * 10).map(f)
由于RDD上的值不会持续存在,我希望每次收集时都会得到不同的输出:
Since above RDD is not persisted I expect I'll get a different output every time I collect:
> rdd.collect()
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]
如果我们忽略了值的分布并不是真正随机的事实,那将或多或少地发生.当我们只考虑第一个元素时,问题就开始了:
If we ignore the fact that distribution of values doesn't really look random it is more or less what happens. Problem starts we we when take only a first element:
assert len(set(rdd.first() for _ in xrange(100))) == 1
或
assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1
似乎每次都返回相同的数字.我已经能够在使用Spark 1.2、1.3和1.4的两台不同机器上重现此行为.在这里,我使用的是np.random.randint
,但其行为与random.randint
相同.
It seems to return the same number each time. I've been able to reproduce this behavior on two different machines with Spark 1.2, 1.3 and 1.4. Here I am using np.random.randint
but it behaves the same way with random.randint
.
此问题与collect
的非完全随机结果相同,似乎是特定于Python的,我无法使用Scala重现它:
This issue, same as non-exactly-random results with collect
, seems to be Python specific and I couldn't reproduce it using Scala:
def f(x: Int) = scala.util.Random.nextInt(1000)
val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size
rdd.collect()
我在这里错过了明显的事情吗?
Did I miss something obvious here?
修改:
找出问题的根源是Python RNG实现.引用官方文档:
Turns out the source of the problem is Python RNG implementation. To quote official documentation:
此模块提供的功能实际上是random.Random类的隐藏实例的绑定方法.您可以实例化自己的Random实例,以获取不共享状态的生成器.
The functions supplied by this module are actually bound methods of a hidden instance of the random.Random class. You can instantiate your own instances of Random to get generators that don’t share state.
我假设NumPy的工作方式相同,并使用RandomState
实例重写f
,如下所示
I assume NumPy works the same way and rewriting f
using RandomState
instance as follows
import os
import binascii
def f(x, seed=None):
seed = (
seed if seed is not None
else int(binascii.hexlify(os.urandom(4)), 16))
rs = np.random.RandomState(seed)
return rs.randint(1000)
使速度变慢,但可以解决问题.
makes it slower but solves the problem.
虽然以上说明的不是收集的随机结果,但我仍然不了解它如何影响多个动作之间的first
/take(1)
.
While above explains not random results from collect I still don't understand how it affects first
/ take(1)
between multiple actions.
推荐答案
所以这里的实际问题相对简单. Python中的每个子进程都从其父级继承其状态:
So the actual problem here is relatively simple. Each subprocess in Python inherits its state from its parent:
len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect()))
# 1
由于在这种特殊情况下父母状态没有理由改变并且工人的寿命有限,所以每个孩子的状态在每次运行中都将完全相同.
Since parent state has no reason to change in this particular scenario and workers have a limited lifespan, state of every child will be exactly the same on each run.
这篇关于PySpark中的随机数生成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!