"EOF错误";在程序退出时使用多处理队列和线程 [英] "EOF error" at program exit using multiprocessing Queue and Thread

查看:732
本文介绍了"EOF错误";在程序退出时使用多处理队列和线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我很难理解为什么这个简单程序会引发 EOFError 最后.

我正在使用 Queue() 进行交流我想要自动清洁的 Thread() 终止我程序的 atexit .

import threading
import multiprocessing
import atexit

class MyClass:

    def __init__(self):
        self.queue = None
        self.thread = None

    def start(self):
        self.queue = multiprocessing.Queue()
        self.thread = threading.Thread(target=self.queued_writer, daemon=True)
        self.thread.start()

        # Remove this: no error
        self.queue.put("message")

    def queued_writer(self):
        while 1:
            msg = self.queue.get()
            print("Message:", msg)
            if msg is None:
                break

    def stop(self):
        self.queue.put(None)
        self.thread.join()

instance = MyClass()

atexit.register(instance.stop)

# Put this before register: no error
instance.start()

这引起了

Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "test.py", line 21, in queued_writer
    msg = self.queue.get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 94, in get
    res = self._recv_bytes()
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError

此外,此代码段的行为也很奇怪:如果我删除self.queue.put("message")行,则不会引发任何错误,并且线程会成功退出.同样,如果在atexit.register()之前调用instance.start(),则这似乎可行.

有人知道错误从哪里来吗?

编辑:我注意到使用 SimpleQueue() 似乎使错误消失了.

解决方案

问题来自多个 atexit.register() 调用.

文档指出:

atexit以与注册时相反的顺序运行这些功能.如果您注册ABC,则在解释器终止时,它们将以CBA的顺序运行.

[...]

假设通常在较低级别的模块之前先导入较低级别的模块,因此必须稍后对其进行清理.

首先导入multiprocessing,然后调用atexit.register(my_stop),您希望您的stop函数在任何内部终止过程之前执行...但这不是事实,因为atexit.register()可以动态调用. /p>

在当前情况下,multiprocessing库使用 _exit_function 函数,该函数用于干净地关闭内部线程和队列.此函数在模块级别atexit 在模块级别,但是仅在一旦Queue()对象已初始化.

因此,MyClass停止功能在multiprocessing之前注册,因此instance.stop _exit_function之后被称为.

在终止期间,_exit_function关闭内部管道连接,因此如果线程稍后尝试使用闭合的读取连接来调用.get(),则会引发EOFError.仅当Python没有时间最后自动杀死daemon线程时才发生这种情况,也就是说,如果注册了慢速"退出函数(如time.sleep(0.1)或本例中的thread.join()),并在通常情况下运行关闭程序.由于某些原因,写连接关闭会延迟,因此.put()不会立即引发错误.

为何要对代码段进行一些小的修改才能使其起作用:SimpleQueue没有Finalizer,因此稍后将关闭内部管道.在调用第一个.put()之前,不会启动Queue的内部线程,因此删除它意味着没有管道要关闭.也可以通过导入multiprocessing.queues强制注册.

I have trouble understanding why this simple program raises an EOFError at the end.

I am using a Queue() to communicate with a Thread() that I want to automatically and cleanly terminate atexit of my program.

import threading
import multiprocessing
import atexit

class MyClass:

    def __init__(self):
        self.queue = None
        self.thread = None

    def start(self):
        self.queue = multiprocessing.Queue()
        self.thread = threading.Thread(target=self.queued_writer, daemon=True)
        self.thread.start()

        # Remove this: no error
        self.queue.put("message")

    def queued_writer(self):
        while 1:
            msg = self.queue.get()
            print("Message:", msg)
            if msg is None:
                break

    def stop(self):
        self.queue.put(None)
        self.thread.join()

instance = MyClass()

atexit.register(instance.stop)

# Put this before register: no error
instance.start()

This raises:

Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "test.py", line 21, in queued_writer
    msg = self.queue.get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 94, in get
    res = self._recv_bytes()
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError

Moreover, this snippet behaves strangely: if I remove the self.queue.put("message") line, no error is raised and the thread exits successfully. Similarly, this seems to work if the instance.start() is call before atexit.register().

Does anyone know from where could come the error please?

Edit: I noticed that using a SimpleQueue() seems to make the error disappear.

解决方案

The issue comes from a conflict between multiple atexit.register() calls.

The documentation states that:

atexit runs these functions in the reverse order in which they were registered; if you register A, B, and C, at interpreter termination time they will be run in the order C, B, A.

[...]

The assumption is that lower level modules will normally be imported before higher level modules and thus must be cleaned up later.

By first importing multiprocessing and then calling atexit.register(my_stop), you would expect your stop function to be executed before any internal termination procedure... But this is not the case, because atexit.register() may be called dynamically.

In the present case, the multiprocessing library makes use of a _exit_function function which is meant to cleanly close internal threads and queues. This function is registered in atexit at the module level, however the module is only loaded once the Queue() object is initialized.

Consequently, the MyClass stop function is registered before the multiprocessing's one and thus instance.stop is called after _exit_function.

During its termination, _exit_function closes internal pipes connections, so if the thread later try to call .get() with a closed read-connection, an EOFError is raised. This happens only if Python did not have time to automatically kill the daemon thread at the end, that is if a "slow" exit function (like time.sleep(0.1) or in this case thread.join()) is register and run after the usual closure procedure. For some reason, the write-connection shutdown is delayed hence .put() does not raise error immediately.

As to why small modifications to the snippet makes it work: SimpleQueue does not have Finalizer so internal pipe is closed later. The internal thread of Queue is not started until the first .put() is called so removing it means there is no pipe to close. It is also posible to force registeration by importing multiprocessing.queues.

这篇关于"EOF错误";在程序退出时使用多处理队列和线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆