joblib 连接到 Dask 后端:tornado.iostream.StreamClosedError: Stream is closed [英] joblib connection to Dask backend: tornado.iostream.StreamClosedError: Stream is closed

查看:32
本文介绍了joblib 连接到 Dask 后端:tornado.iostream.StreamClosedError: Stream is closed的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在我的 dask worker 上运行一个简单的程序.下面是程序.

I am running a simple program on my dask worker. Below is the program.

import numpy as np
from dask.distributed import Client


import joblib
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC

client = Client('127.0.0.1:30006', timeout=10000)
client.get_versions(check=True)
import pandas as pd
digits = load_digits()

param_space = {
    'C': np.logspace(-6, 6, 13),
    'gamma': np.logspace(-8, 8, 17),
    'tol': np.logspace(-4, -1, 4),
    'class_weight': [None, 'balanced'],
}

model = SVC(kernel='rbf')
search = RandomizedSearchCV(model, param_space, cv=3, n_iter=50, verbose=10)


with joblib.parallel_backend('dask'): #Running it on dask worker
    search.fit(digits.data, digits.target)

30006 是我运行调度程序的端口.

30006 is my port on which scheduler is running.

我收到以下错误.

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x000001DC4E701850>>, <Task finished name='Task-42' coro=<DaskDistributedBackend.apply_async.<locals>.f() done, defined at C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py:316> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 187, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\tornado\ioloop.py", line 741, in _run_callback
    ret = callback()
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\tornado\ioloop.py", line 765, in _discard_future_result
    future.result()
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 317, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 306, in _to_func_args
    await maybe_to_futures(kwargs.values())))
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 289, in maybe_to_futures
    [f] = await self.client.scatter(
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\client.py", line 2084, in _scatter
    await self.scheduler.scatter(
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\core.py", line 852, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\core.py", line 635, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 202, in read
    convert_stream_closed_error(self, e)
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 126, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc)) from exc
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x000001DC4E701850>>, <Task finished name='Task-44' coro=<DaskDistributedBackend.apply_async.<locals>.f() done, defined at C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py:316> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 187, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

以下是我的包裹信息:

{
    "scheduler": {
        "host": {
            "python": "3.8.0.final.0",
            "python-bits": 64,
            "OS": "Linux",
            "OS-release": "5.4.72-microsoft-standard-WSL2",
            "machine": "x86_64",
            "processor": "",
            "byteorder": "little",
            "LC_ALL": "C.UTF-8",
            "LANG": "C.UTF-8"
        },
        "packages": {
            "python": "3.8.0.final.0",
            "dask": "2021.01.0",
            "distributed": "2021.01.0",
            "msgpack": "1.0.0",
            "cloudpickle": "1.6.0",
            "tornado": "6.1",
            "toolz": "0.11.1",
            "numpy": "1.18.1",
            "lz4": "3.1.1",
            "blosc": "1.9.2"
        }
    },
    "workers": {
        "tcp://10.1.1.92:37435": {
            "host": {
                "python": "3.8.0.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "5.4.72-microsoft-standard-WSL2",
                "machine": "x86_64",
                "processor": "",
                "byteorder": "little",
                "LC_ALL": "C.UTF-8",
                "LANG": "C.UTF-8"
            },
            "packages": {
                "python": "3.8.0.final.0",
                "dask": "2021.01.0",
                "distributed": "2021.01.0",
                "msgpack": "1.0.0",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.18.1",
                "lz4": "3.1.1",
                "blosc": "1.9.2"
            }
        },
        "tcp://10.1.1.93:45855": {
            "host": {
                "python": "3.8.0.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "5.4.72-microsoft-standard-WSL2",
                "machine": "x86_64",
                "processor": "",
                "byteorder": "little",
                "LC_ALL": "C.UTF-8",
                "LANG": "C.UTF-8"
            },
            "packages": {
                "python": "3.8.0.final.0",
                "dask": "2021.01.0",
                "distributed": "2021.01.0",
                "msgpack": "1.0.0",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.18.1",
                "lz4": "3.1.1",
                "blosc": "1.9.2"
            }
        },
        "tcp://10.1.1.94:36523": {
            "host": {
                "python": "3.8.0.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "5.4.72-microsoft-standard-WSL2",
                "machine": "x86_64",
                "processor": "",
                "byteorder": "little",
                "LC_ALL": "C.UTF-8",
                "LANG": "C.UTF-8"
            },
            "packages": {
                "python": "3.8.0.final.0",
                "dask": "2021.01.0",
                "distributed": "2021.01.0",
                "msgpack": "1.0.0",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.18.1",
                "lz4": "3.1.1",
                "blosc": "1.9.2"
            }
        }
    },
    "client": {
        "host": {
            "python": "3.8.6.final.0",
            "python-bits": 64,
            "OS": "Windows",
            "OS-release": "10",
            "machine": "AMD64",
            "processor": "Intel64 Family 6 Model 142 Stepping 10, GenuineIntel",
            "byteorder": "little",
            "LC_ALL": "None",
            "LANG": "None"
        },
        "packages": {
            "python": "3.8.6.final.0",
            "dask": "2021.01.0",
            "distributed": "2021.01.0",
            "msgpack": "1.0.0",
            "cloudpickle": "1.6.0",
            "tornado": "6.1",
            "toolz": "0.11.1",
            "numpy": "1.18.1",
            "lz4": "None",
            "blosc": "None"
        }
    }
}

我怀疑问题出在 joblib 上,因为如果我在没有with joblib.parallel_backend('dask'):"行的情况下运行它;fit 命令工作正常.另外,我在 dask worker 上尝试了一个简单的 numpy 数组计算并且它有效.因此,来自我的客户的 dask 工人和连接运行良好.我尝试过不同版本的 joblib.(0.16.0. 0.17.0, 1.0.0, 1.0.1) 并且同样的错误仍然存​​在.

I am suspecting the issue is with joblib, because if i run it without the line "with joblib.parallel_backend('dask'):" the fit commands works fine. Also, i tried a simple numpy array calcuation on dask worker and it works. So dask worker and connection from my client works good. i have tried with different versions of joblib. (0.16.0. 0.17.0, 1.0.0, 1.0.1) and the same error persists.

推荐答案

问题在于工作程序和客户端中运行的库版本不同.我从工作人员那里做了一个 pip 列表,并在客户端 Docker 文件上安装了所有具有特定版本的库.现在它正在工作.能够适应 dask 工人

The problem was with different version of libraries running in worker and the client. I did a pip list from the worker and installed all the libraries with their specific versions on the client Docker file. Now it is working. am able to do the fit in dask workers

这篇关于joblib 连接到 Dask 后端:tornado.iostream.StreamClosedError: Stream is closed的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆