我可以在用ctypes包装的函数上使用dask.delayed吗? [英] Can I use dask.delayed on a function wrapped with ctypes?
问题描述
目标是使用 dask.delayed
并行处理我的代码中的令人尴尬的并行部分。该代码涉及调用python函数,该函数使用 ctypes
包装c函数。为了理解我得到的错误,我写了一个非常基本的示例。
c函数:
double zippy_sum(double x, double y)
{
return x + y;
}
Python:
从dask.distributed import客户端
客户端=客户端(n_workers = 4)
客户端
导入os
导入dask
导入ctypes
current_dir = os.getcwd()#os.path.abspath(os.path.dirname(__ file__))
_mod = ctypes.cdll.LoadLibrary(os .path.join(current_dir, zippy.so))
_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double,ctypes.c_double]
_zippy_sum。 restype = ctypes.c_double
def zippy(x,y):
z = _zippy_sum(x,y)
return z
结果= dask.delayed(zippy)(1。,2.)
结果.compute()
回溯:
------------------ -------------------------------------------------- ------- KeyError跟踪(最近一次调用
最后)
〜/ .edm / envs / evaxi3.6 / lib / python3.6 / site-packages / distributed / worker。 py
在具有_cache_lock的dumps_function(func)3286中:
-> 3287 result = cache_dumps [func] 3288除了KeyError:
〜/ .edm /envs/evaxi3.6/lib/python3.6/site-packages/distributed/utils.py
在 getitem ((自身,密钥)中)1517 def getitem (self,key):
-> 1518 value = super()。 getitem (key)1519 self.data.move_to_end(key)
〜/ .edm / envs / evaxi3.6 / lib / python3.6 / collections / init .py in
getitem ((自我,密钥)
990返回self。 class 。丢失(self,key)
-> 991提高KeyError(key)
992 def setitem ((自身,密钥,项目):self.data [key] =项
KeyError:函数zippy at 0x11ffc50d0
在处理上述异常期间,发生了另一个异常:
Val ueError错误回溯(最近一次调用
最后)
〜/ .edm / envs / evaxi3.6 / lib / python3.6 / site-packages / distributed / protocol / pickle.py
转储(x)
40,如果结果为b main :
---> 41 return cloudpickle.dumps(x,protocol = pickle.HIGHEST_PROTOCOL)
42其他:
〜/ .edm / envs / evaxi3.6 / lib / python3.6 / site-packages / cloudpickle / cloudpickle.py
转储( obj,protocol)1147 cp = CloudPickler(file,
protocol = protocol)
-> 1148 cp.dump(obj)1149 return file.getvalue()
〜/ .edm / envs / evaxi3.6 / lib / python3.6 / site-packages / cloudpickle / cloudpickle.py
在dump(self,obj)
490尝试:
-> 491 return Pickler.dump(self,obj)
492除了RuntimeError as e:
〜/ .edm / envs / evaxi3.6 /lib/python3.6/pickle.py在dump(self,obj)
408 self.framer.start_framing()
-> 409 self.save(obj)
410 self.write(STOP)
〜/保存中的.edm / envs / evaxi3.6 / lib / python3.6 / pickle.py(self,obj,
save_persistent_id)
475如果f不为None:
-> 476 f(self,obj)#调用带有显式self
477 return的未绑定方法
〜/ .edm / envs / evaxi3.6 / lib / python3.6 /site-packages/cloudpickle/cloudpickle.py
in save_function(self,obj,name)
565 else:
-> 566 return self.save_function_tuple(obj)
567
〜/ .edm / envs / evaxi3.6 / lib / python3.6 / site-packages / cloudpickle / cloudpickle.py
在save_function_tuple(self, func)
779 state ['kwdefaults'] = func。 kwdefaults
-> 780 save(state)
781 write(pickle.TUPLE)
〜/ .edm / envs / evaxi3.6 / lib / python3.6 /在save(self,obj,
save_persistent_id)中的pickle.py
475如果f不为None:
-> 476 f(self,obj)#调用带有显式self $ b的未绑定方法$ b 477 return
〜/ .edm / envs / evaxi3.6 / lib / python3.6 / pickle.py in save_dict(self,obj)
820 self.memoize(obj)
-> 821 self._batch_setitems(obj.items())
822
〜/ .edm / envs / evaxi3.6 / lib / python3.6 / pickle.py in _batch_setitems(self,
items)
846 save(k)
-> 847 save(v)
848写(SETITEMS)
〜/ .edm / envs / evaxi3.6 / lib / python3.6 / pickle.py保存为(self,obj,
save_persistent_id)
如果f不为475,则无:
-> 476 f(self,obj)#调用具有显式self的unbound方法
477返回
〜/ .edm / envs / evaxi3.6 / lib / python3.6 / pickle.py在save_ dict(self,obj)
820 self.memoize(obj)
-> 821 self._batch_setitems(obj.items())
822
〜/ .edm / envs / evaxi3.6 / lib / python3.6 / pickle.py in _batch_setitems(self,
items)
851 save(k)
- -> 852 save(v)
853 write(SETITEM)
〜/ .edm / envs / evaxi3.6 / lib / python3.6 / pickle。 py in save(self,obj,
save_persistent_id)
495如果reduce不是None:
-> 496 rv = reduce(self.proto)
497否则:
ValueError:不能对包含指针的ctypes对象进行腌制
不幸的是,我仍然这样做不明白的错误!我刚开始使用 dask
,并且只对 ctypes
有一些基本的经验。是否有人对如何解决这个问题,甚至了解需要解决的问题提出建议?
谢谢!
实际上,您不能序列化引用的函数闭包或参数中的C函数。但是,如果您的函数位于所有工作人员均可访问的模块中,那么您最终只会序列化模块名称,而python会做正确的事情。
module zippy .py(在python PATH上的某个位置,也许是示例的当前目录):
import os
import dask
导入ctypes
current_dir = os.getcwd()#os.path.abspath(os.path.dirname(__ file__))
_mod = ctypes.cdll.LoadLibrary(os .path.join(current_dir, zippy.so))
_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double,ctypes.c_double]
_zippy_sum。 restype = ctypes.c_double
def zippy(x,y):
z = _zippy_sum(x,y)
return z
主脚本:
从dask.distributed import客户端
导入zippy
如果__name__ == __main__:
#如果作为脚本运行,这很有用
client = Client(n_workers = 4)
结果lt = dask.delayed(zippy.zippy)(1。,2.)
result.compute()
如果不想创建模块,另一种解决方案是在函数中进行所有C导入和定义。
def zippy(x,y):
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, zippy.so))
_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double,ctypes.c_double]
_zippy_sum.restype = ctypes.c_double
z = _zippy_sum(x,y)
返回z
The goal is to use dask.delayed
to parallelize some 'embarrassingly parallel' sections of my code. The code involves calling a python function which wraps a c-function using ctypes
. To understand the errors I was getting I wrote a very basic example.
The c-function:
double zippy_sum(double x, double y)
{
return x + y;
}
The python:
from dask.distributed import Client
client = Client(n_workers = 4)
client
import os
import dask
import ctypes
current_dir = os.getcwd() #os.path.abspath(os.path.dirname(__file__))
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))
_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double
def zippy(x, y):
z = _zippy_sum(x, y)
return z
result = dask.delayed(zippy)(1., 2.)
result.compute()
The Traceback:
--------------------------------------------------------------------------- KeyError Traceback (most recent call last) ~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/worker.py in dumps_function(func) 3286 with _cache_lock: -> 3287 result = cache_dumps[func] 3288 except KeyError:
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/utils.py in getitem(self, key) 1517 def getitem(self, key): -> 1518 value = super().getitem(key) 1519 self.data.move_to_end(key)
~/.edm/envs/evaxi3.6/lib/python3.6/collections/init.py in getitem(self, key) 990 return self.class.missing(self, key) --> 991 raise KeyError(key) 992 def setitem(self, key, item): self.data[key] = item
KeyError: function zippy at 0x11ffc50d0
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last) ~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x) 40 if b"main" in result: ---> 41 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL) 42 else:
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol) 1147 cp = CloudPickler(file, protocol=protocol) -> 1148 cp.dump(obj) 1149 return file.getvalue()
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj) 490 try: --> 491 return Pickler.dump(self, obj) 492 except RuntimeError as e:
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in dump(self, obj) 408 self.framer.start_framing() --> 409 self.save(obj) 410 self.write(STOP)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name) 565 else: --> 566 return self.save_function_tuple(obj) 567
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func) 779 state['kwdefaults'] = func.kwdefaults --> 780 save(state) 781 write(pickle.TUPLE)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save_dict(self, obj) 820 self.memoize(obj) --> 821 self._batch_setitems(obj.items()) 822
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in _batch_setitems(self, items) 846 save(k) --> 847 save(v) 848 write(SETITEMS)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save_dict(self, obj) 820 self.memoize(obj) --> 821 self._batch_setitems(obj.items()) 822
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in _batch_setitems(self, items) 851 save(k) --> 852 save(v) 853 write(SETITEM)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 495 if reduce is not None: --> 496 rv = reduce(self.proto) 497 else:
ValueError: ctypes objects containing pointers cannot be pickled
Unfortunately, I still do not understand the errors! I am just getting started with dask
and only have some basic experience with ctypes
. Does anyone have suggestions for how to tackle this, or even understanding what need to be tackled?
Thanks!
Indeed, you cannot serialise a function referencing a C-function in the closure or the arguments. However, if your function is in a module which is accessible to all workers, then you end up serialising just the module name, and python does the right thing.
module zippy.py (somewhere on your python PATH, perhaps the current directory for the example):
import os
import dask
import ctypes
current_dir = os.getcwd() #os.path.abspath(os.path.dirname(__file__))
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))
_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double
def zippy(x, y):
z = _zippy_sum(x, y)
return z
main script:
from dask.distributed import Client
import zippy
if __name__ == "__main__":
# if running as a script, this is helpful
client = Client(n_workers = 4)
result = dask.delayed(zippy.zippy)(1., 2.)
result.compute()
The other solution, if you don't want to make a module, is to do all your C imports and definitions within the function.
def zippy(x, y):
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))
_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double
z = _zippy_sum(x, y)
return z
这篇关于我可以在用ctypes包装的函数上使用dask.delayed吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!