在Python类火花广播变量的正确处理 [英] Proper handling of spark broadcast variables in a Python class
问题描述
我已经通过Python类实现火花的典范。我有一些头痛呼吁在类中定义的RDD类方法(见的这个问题的细节),但最后还是取得了一些进展。这里是一个类的方法我工作的一个例子:
I've been implementing a model with spark via a python class. I had some headaches calling class methods on a RDD defined in the class (see this question for details), but finally have made some progress. Here is an example of a class method I'm working with:
@staticmethod
def alpha_sampler(model):
# all the variables in this block are numpy arrays or floats
var_alpha = model.params.var_alpha
var_rating = model.params.var_rating
b = model.params.b
beta = model.params.beta
S = model.params.S
Z = model.params.Z
x_user_g0_inner_over_var = model.x_user_g0_inner_over_var
def _alpha_sampler(row):
feature_arr = row[2]
var_alpha_given_rest = 1/((1/var_alpha) + feature_arr.shape[0]*(1/var_rating))
i = row[0]
items = row[1]
O = row[3] - np.inner(feature_arr,b) - beta[items] - np.inner(S[i],Z[items])
E_alpha_given_rest = var_alpha_given_rest * (x_user_g0_inner_over_var[i] + O.sum()/var_rating)
return np.random.normal(E_alpha_given_rest,np.sqrt(var_alpha_given_rest))
return _alpha_sampler
正如你所看到的,以避免序列错误,我定义返回一个函数,进而应用到RDD(每行一个静态方法模式
是父类在这里,这是从模式的另一种方法中调用
)
As you can see, to avoid serialization errors, I define a static method that returns a function that is in turn applied to each row of an RDD (model
is the parent class here, and this is called from within another method of model
):
# self.grp_user is the RDD
self.params.alpha = np.array(self.grp_user.map(model.alpha_sampler(self)).collect())
现在,这一切工作正常,但一切都没有充分利用星火的广播变量。理想情况下,我在这个函数(var_alpha,β,S等)传递变量可以首先被广播到工人,所以,我没有冗余传递它们的图的一部分
。但我不知道如何做到这一点。
Now, this all works fine, but is not leveraging Spark's broadcast variables at all. Ideally, all the variables I'm passing in this function (var_alpha, beta, S, etc.) could first be broadcast to the workers, so that I wasn't redundantly passing them as part of the map
. But I'm not sure how to do this.
我的问题,那么,如下:如何/我应该在哪里使这些成为广播变量,它们可用来,我映射到<$的 alpha_sampler
功能C $ C> grp_user ?有一件事我相信会的工作将是使它们全局变量,例如
My question, then, is the following: How/where should I make these into broadcast variables such that they are available to the alpha_sampler
function that I map to grp_user
? One thing I believe will work would be to make them globals, e.g.
global var_alpha
var_alpha = sc.broadcast(model.params.var_alpha)
# and similarly for the other variables...
然后alpha_sampler可以大大简化:
Then the alpha_sampler could be much simplified:
@staticmethod
def _alpha_sampler(row):
feature_arr = row[2]
var_alpha_given_rest = 1/((1/var_alpha.value) + feature_arr.shape[0]*(1/var_rating.value))
i = row[0]
items = row[1]
O = row[3] - np.inner(feature_arr,b.value) - beta.value[items] - np.inner(S.value[i],Z.value[items])
E_alpha_given_rest = var_alpha_given_rest * (x_user_g0_inner_over_var.value[i] + O.sum()/var_rating.value)
return np.random.normal(E_alpha_given_rest,np.sqrt(var_alpha_given_rest))
不过,当然,这是非常危险的使用,我想,以避免全局变量。有没有更好的办法,让我充分利用广播变量?
But of course this is really dangerous use of globals that I would like to avoid. Is there a better way that lets me leverage broadcast variables?
推荐答案
假设你在这里使用变量只是标量有可能是什么,从性能的角度来看这里获得并使用广播变数会让你code的可读性但你可以通过一个广播变量作为参数传递给静态方法:
Assuming that variables you use here are simply scalars there is probably nothing to gain here from a performance perspective and using broadcast variables will make you code less readable but you can either pass a broadcast variable as an argument to the static method:
class model(object):
@staticmethod
def foobar(a_model, mu):
y = a_model.y
def _foobar(x):
return x - mu.value + y
return _foobar
def __init__(self, sc):
self.sc = sc
self.y = -1
self.rdd = self.sc.parallelize([1, 2, 3])
def get_mean(self):
return self.rdd.mean()
def run_foobar(self):
mu = self.sc.broadcast(self.get_mean())
self.data = self.rdd.map(model.foobar(self, mu))
或初始化有:
class model(object):
@staticmethod
def foobar(a_model):
mu = a_model.sc.broadcast(a_model.get_mean())
y = a_model.y
def _foobar(x):
return x - mu.value + y
return _foobar
def __init__(self, sc):
self.sc = sc
self.y = -1
self.rdd = self.sc.parallelize([1, 2, 3])
def get_mean(self):
return self.rdd.mean()
def run_foobar(self):
self.data = self.rdd.map(model.foobar(self))
这篇关于在Python类火花广播变量的正确处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!