我可以在用ctypes包装的函数上使用dask.delayed吗? [英] Can I use dask.delayed on a function wrapped with ctypes?

查看:94
本文介绍了我可以在用ctypes包装的函数上使用dask.delayed吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

目标是使用 dask.delayed 并行处理我的代码中的令人尴尬的并行部分。该代码涉及调用python函数,该函数使用 ctypes 包装c函数。为了理解我得到的错误,我写了一个非常基本的示例。



c函数:

  double zippy_sum(double x, double y)
{
return x + y;
}

Python:

 从dask.distributed import客户端
客户端=客户端(n_workers = 4)
客户端

导入os
导入dask
导入ctypes

current_dir = os.getcwd()#os.path.abspath(os.path.dirname(__ file__))
_mod = ctypes.cdll.LoadLibrary(os .path.join(current_dir, zippy.so))

_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double,ctypes.c_double]
_zippy_sum。 restype = ctypes.c_double

def zippy(x,y):

z = _zippy_sum(x,y)

return z

结果= dask.delayed(zippy)(1。,2.)
结果.compute()

回溯:


------------------ -------------------------------------------------- ------- KeyError跟踪(最近一次调用
最后)
〜/ .edm / envs / evaxi3.6 / lib / python3.6 / site-packages / distributed / worker。 py
在具有_cache_lock的dumps_function(func)3286中:
-> 3287 result = cache_dumps [func] 3288除了KeyError:



〜/ .edm /envs/evaxi3.6/lib/python3.6/site-packages/distributed/utils.py
getitem ((自身,密钥)中)1517 def getitem (self,key):
-> 1518 value = super()。 getitem (key)1519 self.data.move_to_end(key)



〜/ .edm / envs / evaxi3.6 / lib / python3.6 / collections / init .py in
getitem ((自我,密钥)
990返回self。 class 丢失(self,key)
-> 991提高KeyError(key)
992 def setitem ((自身,密钥,项目):self.data [key] =项



KeyError:函数zippy at 0x11ffc50d0



在处理上述异常期间,发生了另一个异常:



Val ueError错误回溯(最近一次调用
最后)
〜/ .edm / envs / evaxi3.6 / lib / python3.6 / site-packages / distributed / protocol / pickle.py
转储(x)
40,如果结果为b main
---> 41 return cloudpickle.dumps(x,protocol = pickle.HIGHEST_PROTOCOL)
42其他:



〜/ .edm / envs / evaxi3.6 / lib / python3.6 / site-packages / cloudpickle / cloudpickle.py
转储( obj,protocol)1147 cp = CloudPickler(file,
protocol = protocol)
-> 1148 cp.dump(obj)1149 return file.getvalue()



〜/ .edm / envs / evaxi3.6 / lib / python3.6 / site-packages / cloudpickle / cloudpickle.py
在dump(self,obj)
490尝试:
-> 491 return Pickler.dump(self,obj)
492除了RuntimeError as e:



〜/ .edm / envs / evaxi3.6 /lib/python3.6/pickle.py在dump(self,obj)
408 self.framer.start_framing()
-> 409 self.save(obj)
410 self.write(STOP)



〜/保存中的.edm / envs / evaxi3.6 / lib / python3.6 / pickle.py(self,obj,
save_persistent_id)
475如果f不为None:
-> 476 f(self,obj)#调用带有显式self
477 return的未绑定方法



〜/ .edm / envs / evaxi3.6 / lib / python3.6 /site-packages/cloudpickle/cloudpickle.py
in save_function(self,obj,name)
565 else:
-> 566 return self.save_function_tuple(obj)
567



〜/ .edm / envs / evaxi3.6 / lib / python3.6 / site-packages / cloudpickle / cloudpickle.py
在save_function_tuple(self, func)
779 state ['kwdefaults'] = func。 kwdefaults
-> 780 save(state)
781 write(pickle.TUPLE)



〜/ .edm / envs / evaxi3.6 / lib / python3.6 /在save(self,obj,
save_persistent_id)中的pickle.py
475如果f不为None:
-> 476 f(self,obj)#调用带有显式self $ b的未绑定方法$ b 477 return



〜/ .edm / envs / evaxi3.6 / lib / python3.6 / pickle.py in save_dict(self,obj)
820 self.memoize(obj)
-> 821 self._batch_setitems(obj.items())
822



〜/ .edm / envs / evaxi3.6 / lib / python3.6 / pickle.py in _batch_setitems(self,
items)
846 save(k)
-> 847 save(v)
848写(SETITEMS)



〜/ .edm / envs / evaxi3.6 / lib / python3.6 / pickle.py保存为(self,obj,
save_persistent_id)
如果f不为475,则无:
-> 476 f(self,obj)#调用具有显式self的unbound方法
477返回

〜/ .edm / envs / evaxi3.6 / lib / python3.6 / pickle.py在save_ dict(self,obj)
820 self.memoize(obj)
-> 821 self._batch_setitems(obj.items())
822



〜/ .edm / envs / evaxi3.6 / lib / python3.6 / pickle.py in _batch_setitems(self,
items)
851 save(k)
- -> 852 save(v)
853 write(SETITEM)



〜/ .edm / envs / evaxi3.6 / lib / python3.6 / pickle。 py in save(self,obj,
save_persistent_id)
495如果reduce不是None:
-> 496 rv = reduce(self.proto)
497否则:



ValueError:不能对包含指针的ctypes对象进行腌制


不幸的是,我仍然这样做不明白的错误!我刚开始使用 dask ,并且只对 ctypes 有一些基本的经验。是否有人对如何解决这个问题,甚至了解需要解决的问题提出建议?



谢谢!

解决方案

实际上,您不能序列化引用的函数闭包或参数中的C函数。但是,如果您的函数位于所有工作人员均可访问的模块中,那么您最终只会序列化模块名称,而python会做正确的事情。



module zippy .py(在python PATH上的某个位置,也许是示例的当前目录):

  import os 
import dask
导入ctypes

current_dir = os.getcwd()#os.path.abspath(os.path.dirname(__ file__))
_mod = ctypes.cdll.LoadLibrary(os .path.join(current_dir, zippy.so))

_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double,ctypes.c_double]
_zippy_sum。 restype = ctypes.c_double

def zippy(x,y):

z = _zippy_sum(x,y)

return z

主脚本:

 从dask.distributed import客户端
导入zippy
如果__name__ == __main__:
#如果作为脚本运行,这很有用
client = Client(n_workers = 4)

结果lt = dask.delayed(zippy.zippy)(1。,2.)
result.compute()

如果不想创建模块,另一种解决方案是在函数中进行所有C导入和定义。

  def zippy(x,y):
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, zippy.so))

_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double,ctypes.c_double]
_zippy_sum.restype = ctypes.c_double

z = _zippy_sum(x,y)

返回z


The goal is to use dask.delayed to parallelize some 'embarrassingly parallel' sections of my code. The code involves calling a python function which wraps a c-function using ctypes. To understand the errors I was getting I wrote a very basic example.

The c-function:

double zippy_sum(double x, double y)
{
return x + y;
}

The python:

from dask.distributed import Client
client = Client(n_workers = 4)
client

import os
import dask
import ctypes

current_dir = os.getcwd() #os.path.abspath(os.path.dirname(__file__))
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))

_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double

def zippy(x, y):

    z = _zippy_sum(x, y)

    return z

result = dask.delayed(zippy)(1., 2.)
result.compute()

The Traceback:

--------------------------------------------------------------------------- KeyError Traceback (most recent call last) ~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/worker.py in dumps_function(func) 3286 with _cache_lock: -> 3287 result = cache_dumps[func] 3288 except KeyError:

~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/utils.py in getitem(self, key) 1517 def getitem(self, key): -> 1518 value = super().getitem(key) 1519 self.data.move_to_end(key)

~/.edm/envs/evaxi3.6/lib/python3.6/collections/init.py in getitem(self, key) 990 return self.class.missing(self, key) --> 991 raise KeyError(key) 992 def setitem(self, key, item): self.data[key] = item

KeyError: function zippy at 0x11ffc50d0

During handling of the above exception, another exception occurred:

ValueError Traceback (most recent call last) ~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x) 40 if b"main" in result: ---> 41 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL) 42 else:

~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol) 1147 cp = CloudPickler(file, protocol=protocol) -> 1148 cp.dump(obj) 1149 return file.getvalue()

~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj) 490 try: --> 491 return Pickler.dump(self, obj) 492 except RuntimeError as e:

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in dump(self, obj) 408 self.framer.start_framing() --> 409 self.save(obj) 410 self.write(STOP)

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return

~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name) 565 else: --> 566 return self.save_function_tuple(obj) 567

~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func) 779 state['kwdefaults'] = func.kwdefaults --> 780 save(state) 781 write(pickle.TUPLE)

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save_dict(self, obj) 820 self.memoize(obj) --> 821 self._batch_setitems(obj.items()) 822

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in _batch_setitems(self, items) 846 save(k) --> 847 save(v) 848 write(SETITEMS)

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save_dict(self, obj) 820 self.memoize(obj) --> 821 self._batch_setitems(obj.items()) 822

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in _batch_setitems(self, items) 851 save(k) --> 852 save(v) 853 write(SETITEM)

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 495 if reduce is not None: --> 496 rv = reduce(self.proto) 497 else:

ValueError: ctypes objects containing pointers cannot be pickled

Unfortunately, I still do not understand the errors! I am just getting started with dask and only have some basic experience with ctypes. Does anyone have suggestions for how to tackle this, or even understanding what need to be tackled?

Thanks!

解决方案

Indeed, you cannot serialise a function referencing a C-function in the closure or the arguments. However, if your function is in a module which is accessible to all workers, then you end up serialising just the module name, and python does the right thing.

module zippy.py (somewhere on your python PATH, perhaps the current directory for the example):

import os
import dask
import ctypes

current_dir = os.getcwd() #os.path.abspath(os.path.dirname(__file__))
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))

_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double

def zippy(x, y):

    z = _zippy_sum(x, y)

    return z

main script:

from dask.distributed import Client
import zippy
if __name__ == "__main__":
    # if running as a script, this is helpful
    client = Client(n_workers = 4)

result = dask.delayed(zippy.zippy)(1., 2.)
result.compute()

The other solution, if you don't want to make a module, is to do all your C imports and definitions within the function.

def zippy(x, y):
    _mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))

    _zippy_sum = _mod.zippy_sum
    _zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
    _zippy_sum.restype = ctypes.c_double

    z = _zippy_sum(x, y)

    return z

这篇关于我可以在用ctypes包装的函数上使用dask.delayed吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆