PySpark 中的随机数生成 [英] Random numbers generation in PySpark

查看:35
本文介绍了PySpark 中的随机数生成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

让我们从一个总是返回随机整数的简单函数开始:

Lets start with a simple function which always returns a random integer:

import numpy as np

def f(x):
    return np.random.randint(1000)

和一个用零填充并使用 f 映射的 RDD:

and a RDD filled with zeros and mapped using f:

rdd = sc.parallelize([0] * 10).map(f)

由于上面的 RDD 不是持久化的,我希望每次收集时都会得到不同的输出:

Since above RDD is not persisted I expect I'll get a different output every time I collect:

> rdd.collect()
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]

如果我们忽略这样一个事实,即值的分布看起来并不真正随机,那或多或少会发生什么.当我们只取第一个元素时,问题就开始了:

If we ignore the fact that distribution of values doesn't really look random it is more or less what happens. Problem starts we we when take only a first element:

assert len(set(rdd.first() for _ in xrange(100))) == 1

assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1

似乎每次都返回相同的数字.我已经能够在使用 Spark 1.2、1.3 和 1.4 的两台不同机器上重现这种行为.这里我使用 np.random.randint 但它的行为方式与 random.randint 相同.

It seems to return the same number each time. I've been able to reproduce this behavior on two different machines with Spark 1.2, 1.3 and 1.4. Here I am using np.random.randint but it behaves the same way with random.randint.

这个问题与 collect 的非完全随机结果相同,似乎是 Python 特定的,我无法使用 Scala 重现它:

This issue, same as non-exactly-random results with collect, seems to be Python specific and I couldn't reproduce it using Scala:

def f(x: Int) = scala.util.Random.nextInt(1000)

val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size

rdd.collect()

我在这里错过了什么明显的东西吗?

Did I miss something obvious here?

编辑:

原来问题的根源是 Python RNG 实现.引用官方文档:

Turns out the source of the problem is Python RNG implementation. To quote official documentation:

该模块提供的函数实际上是 random.Random 类的隐藏实例的绑定方法.您可以实例化自己的 Random 实例以获取不共享状态的生成器.

The functions supplied by this module are actually bound methods of a hidden instance of the random.Random class. You can instantiate your own instances of Random to get generators that don’t share state.

我假设 NumPy 的工作方式相同,使用 RandomState 实例重写 f 如下

I assume NumPy works the same way and rewriting f using RandomState instance as follows

import os
import binascii

def f(x, seed=None):
    seed = (
        seed if seed is not None 
        else int(binascii.hexlify(os.urandom(4)), 16))
    rs = np.random.RandomState(seed)
    return rs.randint(1000)

使它变慢但解决了问题.

makes it slower but solves the problem.

虽然上面解释的不是收集的随机结果,但我仍然不明白它如何影响多个动作之间的first/take(1).

While above explains not random results from collect I still don't understand how it affects first / take(1) between multiple actions.

推荐答案

所以这里的实际问题比较简单.Python 中的每个子进程都从其父进程继承其状态:

So the actual problem here is relatively simple. Each subprocess in Python inherits its state from its parent:

len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect()))
# 1

由于在这种特定情况下父状态没有理由改变,而且工人的生命周期有限,因此每次运行时每个子状态都将完全相同.

Since parent state has no reason to change in this particular scenario and workers have a limited lifespan, state of every child will be exactly the same on each run.

这篇关于PySpark 中的随机数生成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆