在Python类火花广播变量的正确处理 [英] Proper handling of spark broadcast variables in a Python class

查看:200
本文介绍了在Python类火花广播变量的正确处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经通过Python类实现火花的典范。我有一些头痛呼吁在类中定义的RDD类方法(见的这个问题的细节),但最后还是取得了一些进展。这里是一个类的方法我工作的一个例子:

I've been implementing a model with spark via a python class. I had some headaches calling class methods on a RDD defined in the class (see this question for details), but finally have made some progress. Here is an example of a class method I'm working with:

@staticmethod
def alpha_sampler(model):

    # all the variables in this block are numpy arrays or floats
    var_alpha = model.params.var_alpha
    var_rating = model.params.var_rating
    b = model.params.b
    beta = model.params.beta
    S = model.params.S
    Z = model.params.Z
    x_user_g0_inner_over_var = model.x_user_g0_inner_over_var

    def _alpha_sampler(row):
        feature_arr = row[2]
        var_alpha_given_rest = 1/((1/var_alpha) + feature_arr.shape[0]*(1/var_rating))
        i = row[0]
        items = row[1]
        O = row[3] - np.inner(feature_arr,b) - beta[items] - np.inner(S[i],Z[items])
        E_alpha_given_rest = var_alpha_given_rest * (x_user_g0_inner_over_var[i] + O.sum()/var_rating)
        return np.random.normal(E_alpha_given_rest,np.sqrt(var_alpha_given_rest))
    return _alpha_sampler

正如你所看到的,以避免序列错误,我定义返回一个函数,进而应用到RDD(每行一个静态方法模式是父类在这里,这是从模式的另一种方法中调用

As you can see, to avoid serialization errors, I define a static method that returns a function that is in turn applied to each row of an RDD (model is the parent class here, and this is called from within another method of model):

# self.grp_user is the RDD
self.params.alpha = np.array(self.grp_user.map(model.alpha_sampler(self)).collect())

现在,这一切工作正常,但一切都没有充分利用星火的广播变量。理想情况下,我在这个函数(var_alpha,β,S等)传递变量可以首先被广播到工人,所以,我没有冗余传递它们的图的一部分。但我不知道如何做到这一点。

Now, this all works fine, but is not leveraging Spark's broadcast variables at all. Ideally, all the variables I'm passing in this function (var_alpha, beta, S, etc.) could first be broadcast to the workers, so that I wasn't redundantly passing them as part of the map. But I'm not sure how to do this.

我的问题,那么,如下:如何/我应该在哪里使这些成为广播变量,它们可用来,我映射到<$的 alpha_sampler 功能C $ C> grp_user ?有一件事我相信会的工作将是使它们全局变量,例如

My question, then, is the following: How/where should I make these into broadcast variables such that they are available to the alpha_sampler function that I map to grp_user? One thing I believe will work would be to make them globals, e.g.

global var_alpha
var_alpha = sc.broadcast(model.params.var_alpha)
# and similarly for the other variables...

然后alpha_sampler可以大大简化:

Then the alpha_sampler could be much simplified:

@staticmethod
def _alpha_sampler(row):
    feature_arr = row[2]
    var_alpha_given_rest = 1/((1/var_alpha.value) + feature_arr.shape[0]*(1/var_rating.value))
    i = row[0]
    items = row[1]
    O = row[3] - np.inner(feature_arr,b.value) - beta.value[items] - np.inner(S.value[i],Z.value[items])
    E_alpha_given_rest = var_alpha_given_rest * (x_user_g0_inner_over_var.value[i] + O.sum()/var_rating.value)
    return np.random.normal(E_alpha_given_rest,np.sqrt(var_alpha_given_rest))

不过,当然,这是非常危险的使用,我想,以避免全局变量。有没有更好的办法,让我充分利用广播变量?

But of course this is really dangerous use of globals that I would like to avoid. Is there a better way that lets me leverage broadcast variables?

推荐答案

假设你在这里使用变量只是标量有可能是什么,从性能的角度来看这里获得并使用广播变数会让你code的可读性但你可以通过一个广播变量作为参数传递给静态方法:

Assuming that variables you use here are simply scalars there is probably nothing to gain here from a performance perspective and using broadcast variables will make you code less readable but you can either pass a broadcast variable as an argument to the static method:

class model(object):
    @staticmethod
    def foobar(a_model, mu):
        y = a_model.y
        def _foobar(x):
            return x - mu.value + y 
        return _foobar

    def __init__(self, sc):
        self.sc = sc
        self.y = -1
        self.rdd = self.sc.parallelize([1, 2, 3])

    def get_mean(self):
        return self.rdd.mean()

    def run_foobar(self):
        mu = self.sc.broadcast(self.get_mean())
        self.data = self.rdd.map(model.foobar(self, mu))

或初始化有:

class model(object):
    @staticmethod
    def foobar(a_model):
        mu = a_model.sc.broadcast(a_model.get_mean())
        y = a_model.y
        def _foobar(x):
            return x - mu.value + y 
        return _foobar

    def __init__(self, sc):
        self.sc = sc
        self.y = -1
        self.rdd = self.sc.parallelize([1, 2, 3])

    def get_mean(self):
        return self.rdd.mean()

    def run_foobar(self):
        self.data = self.rdd.map(model.foobar(self))

这篇关于在Python类火花广播变量的正确处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆