使用字典类型时多处理时出现EOF错误? [英] EOF error in multiprocessing while using dictionary type?

查看:15
本文介绍了使用字典类型时多处理时出现EOF错误?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有下面这段代码,它完全可以执行我想要的操作。但是,对于文件大小较大的文件,它会中断并给我一个错误:

错误:";引发EOFError;

def co(operation, in_queue, processed_lines):
    while True:
        item = in_queue.get()
        line_number, line = item

        if line is None:
            return

        line = line + operation + "changed"
        processed_lines[line_number] = line

def _fo(file_name, operation):

    manager = Manager()
    results = manager.dict()
    work = manager.Queue(10)

    pool = []
    for i in range(10):
        p = Process(target=co, args=(operation, work, results))
        p.start()
        pool.append(p)

    with open(file_name) as f:
        num_lines = 0
        iters = itertools.chain(f, (None,) * 10)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)
                num_lines += 1

    for p in pool:
        p.join()

    return [results[idx] for idx in range(num_lines - 10)]

现在我知道我的主进程在我的多进程可以写回结果之前关闭,但是我无法解析它。我已经使用p.Join()优雅地结束了我的进程。我尝试将p.CLOSE()放在p.Join()之前,但出现错误:‘Process’对象没有属性‘Close’。

我可以在这里做些什么来解决这个问题?

错误:

2020-10-01T15:55:22.488-05:00   item = in_queue.get()

2020-10-01T15:55:22.488-05:00   File "<string>", line 2, in get

2020-10-01T15:55:22.488-05:00   File "/opt/python3.6/lib/python3.6/multiprocessing/managers.py", line 757, in _callmethod

2020-10-01T15:55:22.488-05:00   kind, result = conn.recv()

2020-10-01T15:55:22.488-05:00   File "/opt/python3.6/lib/python3.6/multiprocessing/connection.py", line 250, in recv

2020-10-01T15:55:22.488-05:00   buf = self._recv_bytes()

2020-10-01T15:55:22.488-05:00   File "/opt/python3.6/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes

2020-10-01T15:55:22.488-05:00   buf = self._recv(4)

2020-10-01T15:55:22.488-05:00   File "/opt/python3.6/lib/python3.6/multiprocessing/connection.py", line 383, in _recv

2020-10-01T15:55:22.488-05:00   raise EOFError

2020-10-01T15:55:22.488-05:00   EOFError

推荐答案

现在我意识到我的主进程在我的多进程之前关闭 我可以写回结果,但我无法解决它。

因此,这似乎不是输入文件大小的问题,而是一些更大的要处理的事情,只是需要更多的时间和您是否愿意提前终止主程序?或者出于某种原因,您选择在处理完成之前退出主进程。听起来,在您继续前进并退出之前,您需要一种额外的适当取消形式来阻止工人。

我已经使用p.Join()正常关闭了我的进程。

Process.join()实际上并不是您的主进程的优雅关闭。它只是意味着特定的作用域正在阻塞,直到您的工作进程列表选择终止。如果出于任何原因,您使用KeyboardInterrupt终止您的应用程序,或者在此应用程序在另一个线程中运行时通知您的主线程退出,则在尝试从父进程读取更多工作项时,您的主线程将终止,并且您的子进程将遇到EOF。

主进程和子进程的实现设置为向所有工作进程发送队列上的None值,以通知它们退出,然后在主进程中解除所有Process.join()调用的阻塞。如果您没有在该点之前向每个Worker发送None并退出Main,则可能会遇到EOF问题,因为Worker尚未停止。

我尝试将p.Close()放在p.Join()之前,但出现错误:";‘process’对象没有属性‘Close’";。

https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.Process.close
版本3.7中的新功能(&Q;)。

这意味着您正在运行的是低于3.7的Python版本。但是,如果您的主进程要提前退出,则可以改为调用terminatekill。最好停止向工作进程发送线路,并发送最终None值以使工作进程正常停止,然后使用join()调用来等待它们。

with open(file_name) as f:
    num_lines = 0
    iters = itertools.chain(f, (None,) * 10)
    for num_and_line in enumerate(iters):
        work.put(num_and_line)
            num_lines += 1

这段代码在每一行上迭代,将其发送到队列,最后为每个Worker(本例中为10)发送一个None值。如果您决定要取消工作,则需要停止发送行,而是发送10个None值,然后中断。

有关更多详细信息,您需要描述您的取消情况。

这篇关于使用字典类型时多处理时出现EOF错误?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆