PySpark中的随机数生成 [英] Random numbers generation in PySpark

查看:831
本文介绍了PySpark中的随机数生成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

让我们从一个简单的函数开始,该函数总是返回一个随机整数:

Lets start with a simple function which always returns a random integer:

import numpy as np

def f(x):
    return np.random.randint(1000)

和一个用零填充并使用f映射的RDD:

and a RDD filled with zeros and mapped using f:

rdd = sc.parallelize([0] * 10).map(f)

由于RDD上的值不会持续存在,我希望每次收集时都会得到不同的输出:

Since above RDD is not persisted I expect I'll get a different output every time I collect:

> rdd.collect()
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]

如果我们忽略了值的分布并不是真正随机的事实,那将或多或少地发生.当我们只考虑第一个元素时,问题就开始了:

If we ignore the fact that distribution of values doesn't really look random it is more or less what happens. Problem starts we we when take only a first element:

assert len(set(rdd.first() for _ in xrange(100))) == 1

assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1

似乎每次都返回相同的数字.我已经能够在使用Spark 1.2、1.3和1.4的两台不同机器上重现此行为.在这里,我使用的是np.random.randint,但其行为与random.randint相同.

It seems to return the same number each time. I've been able to reproduce this behavior on two different machines with Spark 1.2, 1.3 and 1.4. Here I am using np.random.randint but it behaves the same way with random.randint.

此问题与collect的非完全随机结果相同,似乎是特定于Python的,我无法使用Scala重现它:

This issue, same as non-exactly-random results with collect, seems to be Python specific and I couldn't reproduce it using Scala:

def f(x: Int) = scala.util.Random.nextInt(1000)

val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size

rdd.collect()

我在这里错过了明显的事情吗?

Did I miss something obvious here?

修改:

找出问题的根源是Python RNG实现.引用官方文档:

Turns out the source of the problem is Python RNG implementation. To quote official documentation:

此模块提供的功能实际上是random.Random类的隐藏实例的绑定方法.您可以实例化自己的Random实例,以获取不共享状态的生成器.

The functions supplied by this module are actually bound methods of a hidden instance of the random.Random class. You can instantiate your own instances of Random to get generators that don’t share state.

我假设NumPy的工作方式相同,并使用RandomState实例重写f,如下所示

I assume NumPy works the same way and rewriting f using RandomState instance as follows

import os
import binascii

def f(x, seed=None):
    seed = (
        seed if seed is not None 
        else int(binascii.hexlify(os.urandom(4)), 16))
    rs = np.random.RandomState(seed)
    return rs.randint(1000)

使速度变慢,但可以解决问题.

makes it slower but solves the problem.

虽然以上说明的不是收集的随机结果,但我仍然不了解它如何影响多个动作之间的first/take(1).

While above explains not random results from collect I still don't understand how it affects first / take(1) between multiple actions.

推荐答案

所以这里的实际问题相对简单. Python中的每个子进程都从其父级继承其状态:

So the actual problem here is relatively simple. Each subprocess in Python inherits its state from its parent:

len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect()))
# 1

由于在这种特殊情况下父母状态没有理由改变并且工人的寿命有限,所以每个孩子的状态在每次运行中都将完全相同.

Since parent state has no reason to change in this particular scenario and workers have a limited lifespan, state of every child will be exactly the same on each run.

这篇关于PySpark中的随机数生成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆