pyspark.产生随机数的变压器会产生始终相同的数 [英] pyspark. Transformer that generates a random number generates always the same number

查看:120
本文介绍了pyspark.产生随机数的变压器会产生始终相同的数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试评估必须将dataframe从scala复制到python并在大型管道中返回时对性能的影响.为此,我创建了一个相当人造的变压器:

I am trying to measure the performance impact on having to copy a dataframe from scala to python and back in a large pipeline. For that purpose I have created this rather artificial transformer:

from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.ml.util import keyword_only
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

import random

class RandomColAdderTransformer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__self(self, inputCol=None, outputCol=None, bogusarg=None):
        super(RandomColAdderTransformer, self).__init__()
        self.bogusarg = None
        self._setDefault(bogusarg=set())
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        cur_col = self.getInputCol()
        def randGet(col): # UDF crashes with no arguments
            a = col*random.random() # Ensure we are reading and copying to python space 
            return a            # It runs only once?

        sparktype = FloatType()
        return dataset.withColumn("randFloat", udf(randGet, sparktype)(cur_col))

此转换器的目标是确保从python生成一些数字,它访问dataframe并进行乘法(在python中),然后对于管道的下一个阶段,必须添加dataframe

The goal of this transformer is to ensure that there are some numbers which are generated from python, it accesses the dataframe and does a multiplication (in python) and then for the next stage of the pipeline it will have to add a column to the dataframe

但是我有些奇怪.测试我的代码时,会为所有列生成相同的随机数:

However I am having some weirdness. When testing my code the same random number is generated for all columns:

df = sqlContext.createDataFrame([(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
myTestTransformer = RandomColAdderTransformer()
myTestTransformer.setInputCol("x3")
transformedDF = myTestTransformer.transform(df)
transformedDF.show()

+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+

然后连续调用transformedDF.show()实际上会更改值!?

And then consecutive invocations of transformedDF.show() actually change the values!?

transformedDF.show()
+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+


In [3]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0|  2.9191132|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+


In [4]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+


In [5]: transformedDF.show()
+---+---+-----+----------+
| x1| x2|   x3| randFloat|
+---+---+-----+----------+
|  1|  a| 23.0| 16.033003|
|  3|  B|-23.0|-2.9191132|
+---+---+-----+----------+

这是预期的行为吗? .show()是否实际触发计算开始? AFAIK我应该使用单个节点,请确保它们将在单个线程中运行,以便它们将共享随机种子?我知道存在内置的pyspark rng,但由于它实际上无法从python空间生成数据,因此不适合我的目的.

Is this behavior expected? Does .show() actually trigger the computation start? AFAIK I should be using a single node, sure they would run in a single thread so they would be sharing the random seed? I know a builtin pyspark rng exists, but it is not suitable for my purpose as it wouldn't actually be generating the data from python space.

推荐答案

好吧,这里的期望值是相对的,但这并不是无法解释的.特别是,RNG的状态是从父进程继承的.通过在本地模式下运行以下简单代码段,您可以轻松证明这一点:

Well, expected is rather relative here but it is not something that cannot be explained. In particular the state of the RNG is inherited from the parent process. You can easily prove that by running following simple snippet in the local mode:

import random 

def roll_and_get_state(*args):
    random.random()
    return [random.getstate()]

states = sc.parallelize([], 10).mapPartitions(roll_and_get_state).collect()
len(set(states))
## 1

如您所见,每个分区使用的是自己的RNG,但每个分区的状态都相同.

As you can see each partition has is using its own RNG but all have the same state.

通常,要确保Spark中的Python RNG行为正确而不造成严重的性能损失(特别是如果您需要可重现的结果),就非常棘手.

In general ensuring correct Python RNG behavior in Spark without a serious performance penalty, especially if you need reproducible results, is rather tricky.

一种可能的方法是使用加密安全的随机数据(os.urandom)生成的种子实例化每个分区的单独Random实例.

One possible approach is to instantiate separate Random instance per partition with seed generated using cryptographically safe random data (os.urandom).

如果需要可重复的结果,则可以基于全局状态和分区数据生成RNG种子.不幸的是,在运行时无法从Python轻松访问此信息(忽略特殊情况,例如mapPartitionsWithIndex).

If you need reproducible results you can generate RNG seeds based on global state and partition data. Unfortunately this information is not easily accessible on runtime from Python (ignoring special cases like mapPartitionsWithIndex).

由于分区级别的操作并不总是适用(例如UDF),因此您可以通过使用单例模块或Borg模式为每个执行程序初始化RNG来获得相似的结果.

Since partition level operations are not always applicably (like in case of UDF) you can achieve similar result by using singleton module or Borg pattern to initialize RNG for each executor.

另请参阅:

  • Random numbers generation in PySpark
  • Filtering Spark DataFrame on new column

这篇关于pyspark.产生随机数的变压器会产生始终相同的数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆