使用Dask分布式的自定义参数搜索类期间的thread.lock [英] thread.lock during custom parameter search class using Dask distributed

查看:126
本文介绍了使用Dask分布式的自定义参数搜索类期间的thread.lock的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我编写自己的参数搜索实现主要是因为我不需要GridSearch和scikit-learn的RandomizedSearch的交叉验证.

我使用dask来提供最佳的分布式性能.

这就是我所拥有的:

from scipy.stats import uniform
class Params(object):
    def __init__(self,fixed,loc=0.0,scale=1.0):
        self.fixed=fixed
        self.sched=uniform(loc=loc,scale=scale)

    def _getsched(self,i,size):
        return self.sched.rvs(size=size,random_state=i)

    def param(self,i,size=None):
        tmp=self.fixed.copy()
        if size is None:
            size=tmp['niter']
        tmp.update({'schd':self._getsched(i,size)})
        return tmp    

class Mymodel(object):
    def __init__(self,func,params_object,score,ntries,client):
        self.params=params_object
        self.func=func
        self.score=score
        self.ntries=ntries
        self.client=client

    def _run(self,params,train,test):
        return self.func(params,train,test,self.score)

    def build(self,train,test):
        res=[]
        for i in range(self.ntries):
            cparam=self.params.param(i)
            res.append( (cparam, self.client.submit(self._run, cparam, train,test)) )
        self._results=res
        return res

    def compute_optimal(self,res=None):
        from operator import itemgetter
        if res is None:
            res=self._results
        self._sorted=sorted(self.client.compute(res),key=itemgetter(1))

        return self._sorted[0]


def score(test,correct):
    return np.linalg.norm(test-correct)

def myfunc(params,ldata,data,score):
    schd=params['schd']
    niter=len(schd)
    #here I do some magic after which ldata is changing
    return score(test=ldata,correct=data)

在我开始dask.distributed之后:

from distributed import Client
scheduler_host='myhostname:8786'
cli=Client(scheduler_host)

我是这样运行的:

%%time
params=Params({'niter':50},loc=1.0e-06,scale=1.0)
model=Mymodel(myfunc,params,score,100,cli)
ptdata=bad_data_example.copy()
graph=model.build(ptdata,good_data)

并得到这个:

distributed.protocol.pickle - INFO - Failed to serialize
<bound method Mymodel._run of <__main__.Mymodel object at 0x2b8961903050>>.
Exception: can't pickle thread.lock objects

能否请您帮助我了解发生了什么以及如何解决此问题?

我也很好奇如何在所有参数结果中找到最小值的方式. 是否有更好的方式来使用Dask?

我相当快地编写了此代码,从未尝试过串行使用. 我正在与许多其他主题(机器学习,gpu编程,Numba,Python OOP等)一起学习Dask,因此该代码无论如何都不是最优的...

P.S.要实际执行它,我使用以下调用:model.compute_optimal().还没到这里-由于上述错误.

解决方案

似乎主要问题是由于我试图映射函数的方法.我也有与joblib类似的问题.因此,我重新编码了问题,并删除了所有类.

有关优化的以下问题发布在此处:使用dask进行参数搜索

我将在工作中明确使用dask-searchcv-当我需要交叉验证时-但目前,这实际上只是对最佳解决方案的简单搜索-因此必须创建自己的实现...

I wrote my own parameter search implementation mostly due to the fact that I don't need cross-validation of GridSearch and RandomizedSearch of scikit-learn.

I use dask to deliver optimal distributed performance.

Here is what I have:

from scipy.stats import uniform
class Params(object):
    def __init__(self,fixed,loc=0.0,scale=1.0):
        self.fixed=fixed
        self.sched=uniform(loc=loc,scale=scale)

    def _getsched(self,i,size):
        return self.sched.rvs(size=size,random_state=i)

    def param(self,i,size=None):
        tmp=self.fixed.copy()
        if size is None:
            size=tmp['niter']
        tmp.update({'schd':self._getsched(i,size)})
        return tmp    

class Mymodel(object):
    def __init__(self,func,params_object,score,ntries,client):
        self.params=params_object
        self.func=func
        self.score=score
        self.ntries=ntries
        self.client=client

    def _run(self,params,train,test):
        return self.func(params,train,test,self.score)

    def build(self,train,test):
        res=[]
        for i in range(self.ntries):
            cparam=self.params.param(i)
            res.append( (cparam, self.client.submit(self._run, cparam, train,test)) )
        self._results=res
        return res

    def compute_optimal(self,res=None):
        from operator import itemgetter
        if res is None:
            res=self._results
        self._sorted=sorted(self.client.compute(res),key=itemgetter(1))

        return self._sorted[0]


def score(test,correct):
    return np.linalg.norm(test-correct)

def myfunc(params,ldata,data,score):
    schd=params['schd']
    niter=len(schd)
    #here I do some magic after which ldata is changing
    return score(test=ldata,correct=data)

After I start dask.distributed:

from distributed import Client
scheduler_host='myhostname:8786'
cli=Client(scheduler_host)

I run it like this:

%%time
params=Params({'niter':50},loc=1.0e-06,scale=1.0)
model=Mymodel(myfunc,params,score,100,cli)
ptdata=bad_data_example.copy()
graph=model.build(ptdata,good_data)

And get this:

distributed.protocol.pickle - INFO - Failed to serialize
<bound method Mymodel._run of <__main__.Mymodel object at 0x2b8961903050>>.
Exception: can't pickle thread.lock objects

Could you please help me to understand what is going on and how to fix this?

I'm also curious about the way how I find the minimum within all the parameters results. Is there a better way to do it with Dask?

I wrote this code fairly fast and never tried it in serial. I'm learning Dask together with many other topics (machine learning, gpu programming, Numba, Python OOP and etc.) so this code is not optimal by any means...

P.S. To actually execute it I use this call: model.compute_optimal(). Haven't got here yet - due to the error above.

解决方案

It looks like the main issue was due to the fact that I tried to map a method of a function. I had similar issues with joblib as well. So I re-coded the problem and removed all the classes.

The following issues regarding optimization are posted here: Parameter search using dask

I'll definetely use dask-searchcv in my work - when I'll need cross-validation - but for now it's really only a simple search for an optimal solution - so had to create my own implementation...

这篇关于使用Dask分布式的自定义参数搜索类期间的thread.lock的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆