使用CPU工作器进行数据争论,并使用带DASK的GPU工作器培训xgBoost [英] Data wrangling using CPU workers and training xgboost using GPU workers with dask

查看:0
本文介绍了使用CPU工作器进行数据争论,并使用带DASK的GPU工作器培训xgBoost的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从HDFS读取200个镶木地板文件,然后尝试使用4个GPU训练一个模型。我的机器上也有48个vcore可用。如果我只使用GPU工作器启动集群,那么读取部分将非常慢(因为它只使用分配给GPU工作器的4个CPU工作器,除非您在单独的外壳上运行它们,否则您不可能真正运行比您拥有的GPU数量更多的工作器,然后情况变得很糟糕,因为内存管理问题是您自己的。)我想使用CPU工作器读取文件,与CPU工作器玩数据,然后使用GPU工作器训练xgost模型。我阅读了文档here,了解如何启动具有不同资源的工作人员并将其分配给不同的任务。我也看到了this问题,但我有点困惑。

以下是我尝试运行以读取.parquet文件的代码:

import dask.dataframe as dd 
df = dd 
    .read_parquet(
        "hdfs://address/to/the/*.parquet",
        storage_options = {
            "user":user,
            "kerb_ticket":kerb_ticket},
        engine='pyarrow') 
    .persist()

这将自动使用所有的CPU和GPU工作进程,这很好。在此之后,我需要创建我的培训数据和标签。假设我有X_trainy_trainparams。这里我将它们转换为dask_cudf

X_train = dask_cudf.from_dask_dataframe(X_train)
y_train = dask_cudf.from_dask_dataframe(y_train)

以下是我只需要使用GPU工作进程的部分:

Xy = dxgb.DaskDMatrix(client, X_train, y_train)

为了遵循文档,我应该将其转换为以下格式:

Xy = client.submit(dxgb.DaskDMatrix, client, X_train, y_train, resources={'GPU': 1})

但随后我将收到此错误:

distributed.protocol.pickle - INFO - Failed to serialize (<Client: 'tcp://169.68.236.35:8786' processes=52 threads=52, memory=1.97 TiB>, <dask_cudf.DataFrame | 19200 tasks | 200 npartitions>, <dask_cudf.Series | 600 tasks | 200 npartitions>). Exception: cannot pickle 'socket' object
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/envs/dask/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     48         buffers.clear()
---> 49         result = pickle.dumps(x, **dump_kwargs)
     50         if len(result) < 1000:

/envs/dask/lib/python3.8/socket.py in __getstate__(self)
    271     def __getstate__(self):
--> 272         raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
    273 

TypeError: cannot pickle 'socket' object

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-12-0d6a943365a9> in <module>
      1 # Xy = dxgb.DaskDMatrix(client, X_train, y_train)
      2 # Xy = dxgb.DaskDeviceQuantileDMatrix(client, X_train, y_train)
----> 3 Xy = client.submit(dxgb.DaskDMatrix, client, X_train, y_train, resources={'GPU': 1})
      4 # Xy_valid = dxgb.DaskDMatrix(client, X_valid, y_valid)

/envs/dask/lib/python3.8/site-packages/distributed/client.py in submit(self, func, key, workers, resources, retries, priority, fifo_timeout, allow_other_workers, actor, actors, pure, *args, **kwargs)
   1629             dsk = {skey: (func,) + tuple(args)}
   1630 
-> 1631         futures = self._graph_to_futures(
   1632             dsk,
   1633             [skey],

/envs/dask/lib/python3.8/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
   2646             # Pack the high level graph before sending it to the scheduler
   2647             keyset = set(keys)
-> 2648             dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
   2649 
   2650             # Create futures before sending graph (helps avoid contention)

/envs/dask/lib/python3.8/site-packages/dask/highlevelgraph.py in __dask_distributed_pack__(self, client, client_keys, annotations)
   1045                     "__module__": layer.__module__,
   1046                     "__name__": type(layer).__name__,
-> 1047                     "state": layer.__dask_distributed_pack__(
   1048                         self.get_all_external_keys(),
   1049                         self.key_dependencies,

/envs/dask/lib/python3.8/site-packages/dask/highlevelgraph.py in __dask_distributed_pack__(self, all_hlg_keys, known_key_dependencies, client, client_keys)
    424             for k, v in dsk.items()
    425         }
--> 426         dsk = toolz.valmap(dumps_task, dsk)
    427         return {"dsk": dsk, "dependencies": dependencies}
    428 

/envs/dask/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/envs/dask/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/envs/dask/lib/python3.8/site-packages/distributed/worker.py in dumps_task(task)
   3784             return d
   3785         elif not any(map(_maybe_complex, task[1:])):
-> 3786             return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
   3787     return to_serialize(task)
   3788 

/envs/dask/lib/python3.8/site-packages/distributed/worker.py in warn_dumps(obj, dumps, limit)
   3793 def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
   3794     """Dump an object to bytes, warn if those bytes are large"""
-> 3795     b = dumps(obj, protocol=4)
   3796     if not _warn_dumps_warned[0] and len(b) > limit:
   3797         _warn_dumps_warned[0] = True

/envs/dask/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     58         try:
     59             buffers.clear()
---> 60             result = cloudpickle.dumps(x, **dump_kwargs)
     61         except Exception as e:
     62             logger.info("Failed to serialize %s. Exception: %s", x, e)

/envs/dask/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
     71                 file, protocol=protocol, buffer_callback=buffer_callback
     72             )
---> 73             cp.dump(obj)
     74             return file.getvalue()
     75 

/envs/dask/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
    561     def dump(self, obj):
    562         try:
--> 563             return Pickler.dump(self, obj)
    564         except RuntimeError as e:
    565             if "recursion" in e.args[0]:

/envs/dask/lib/python3.8/socket.py in __getstate__(self)
    270 
    271     def __getstate__(self):
--> 272         raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
    273 
    274     def dup(self):

TypeError: cannot pickle 'socket' object

有人知道如何解决此问题吗?

推荐答案

问题是dask.Client不可序列化,因此您无法提交它。

您可以使用dask.distributed.get_client在任务中访问dask.Client解决此问题:

from dask.distributed import get_client

def create_dmatrix(X_train, y_train):
    client = get_client()

    return dxgb.DaskDMatrix(client, X_train, y_train)

Xy = client.submit(create_dmatrix, X_train, y_train, resources={'GPU': 1})

这篇关于使用CPU工作器进行数据争论,并使用带DASK的GPU工作器培训xgBoost的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆