防止从空 FIFO 读取数据阻塞 [英] Prevent reading data from an empty FIFO from blocking

查看:43
本文介绍了防止从空 FIFO 读取数据阻塞的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 Python 3 Web 应用程序中,我需要使用命令行实用程序来处理图像,将其输出写入命名管道 (fifo),然后将该输出(管道的内容)解析为枕头/枕头图像.这是基本流程(和工作代码这么长而且没有错误!):

from os import mkfifo从 os 导入取消链接从 PIL 导入图像从子进程导入 DEVNULL从子流程导入 PIPE从子流程导入 Popenfifo_path = '/tmp/myfifo.bmp'cmd = '/usr/bin/convert -resize 100/path/to/some.tif ' + fifo_path# 创建一个命名管道mkfifo(fifo_path)# 执行proc = Popen(cmd, stdout=DEVNULL, stderr=PIPE, shell=True)# 解析图像枕头图像 = Image.open(fifo_path)# 完成过程:proc_exit = proc.wait()# 移除管道:取消链接(fifo_path)# 只是为了证明:枕头图像.show()

(在上面的示例中,我已经替换了我实际上必须与 ImageMagick 一起使用的实用程序,只是因为您不太可能拥有它——它根本不会影响问题.)

这在大多数情况下都很好用,我可以处理大多数异常(为了清楚起见,上面省略了),但是有一种情况我无法弄清楚如何处理,如果出现问题该怎么办shellout,导致空管,例如如果图像不存在或由于某种原因损坏,例如:

fifo_path = '/tmp/myfifo.bmp'cmd = '/usr/bin/convert -resize 100/path/to/some/bad_or_missing.tif ' + fifo_path# 创建一个命名管道mkfifo(fifo_path)# 执行proc = Popen(cmd, stdout=DEVNULL, stderr=PIPE, shell=True)# 解析图像Pillow_image = Image.open(fifo_path) # STUCK...

应用程序只是挂在这里,因为我无法到达 proc_exit = proc.wait() 我无法设置 timeout(例如 proc_exit= proc.wait(timeout=2)),这是我通常会做的.

我尝试将整个业务包装在上下文管理器中,类似于这个答案,但该方法不是线程安全,这是一个问题,我找不到线程或多处理解决方案,当我加入线程或进程时,我无法访问 PIL/Pillow Image 实例(不是我的强项,而是类似的东西):

from multiprocessing import Process从 os 导入 mkfifo从 os 导入取消链接从 PIL 导入图像从子进程导入 DEVNULL从子流程导入 PIPE从子流程导入 Popendef do_it(cmd, fifo_path):mkfifo(fifo_path)# 我听说你喜欢带有子进程的子进程...sub_proc = Popen(cmd, stdout=DEVNULL, stderr=PIPE, shell=True)枕头图像 = Image.open(fifo_path)proc_exit = sub_proc.wait()取消链接(fifo_path)fifo_path = '/tmp/myfifo.bmp'cmd = '/usr/bin/convert -resize 100/path/to/some/bad_or_missing.tif ' + fifo_pathproc = Process(target=do_it, args=(cmd, fifo_path))proc.daemon = 真proc.start()proc.join(timeout=3) # 这里可以设置超时时间# 无论如何看起来很重,我如何取回枕头图像以进行进一步的工作?枕头图像.show()

希望这些能说明我的问题以及我的尝试.提前致谢.

解决方案

POSIX <代码>读取(2):

<块引用>

尝试从空管道或 FIFO 读取时:

如果没有进程打开管道进行写入,则 read() 应返回 0 以表示文件结束.

Image.open(fifo_path) 可能会卡住,当且仅当命令在没有打开 fifo_path 进行写入而被阻止时终止.

<块引用>

通常情况下,打开 FIFO 会阻塞,直到另一端也打开.

这是一个正常的顺序:

  1. cmd 在尝试打开时阻塞 fifo_open 进行写入
  2. 您的 Python 代码在尝试打开进行阅读时阻塞
  3. 一旦两个进程都打开了 FIFO,数据流就会开始.除了名称之外,FIFO 类似于管道——只有一个管道对象——内核在内部传递所有数据而不将其写入文件系统. 管道不是可查找的>图像文件,因此Image.open() 可以读到 EOF
  4. cmd 关闭管道的末端.您的代码收到 EOF,因为没有其他进程打开 FIFO 进行写入并且 Image.open(fifo_path) 返回.

    cmd 管道的末端为什么由于成功完成或由于错误而关闭并不重要,无论 cmd 是突然终止还是不是:只要它的末端是封闭的.

    您的进程是否调用 proc.wait() 并不重要.proc.wait() 不会终止 cmd.proc.wait() 不会阻止管道的另一端打开或关闭.proc.wait() 唯一要做的就是等待子进程死亡和/或返回一个已经死亡的子进程的退出状态.

这里是死锁案例:

  1. Image.open() 调用时,cmd 甚至不会因为任何原因尝试打开 fifo_open 进行写入,例如,没有/usr/bin/convert、错误的命令行参数、错误/无输入等
  2. 您的 Python 代码在尝试打开进行阅读时阻塞

fifo_open 不是为写作而打开的,因此 Image.open(fifo_open) 一直在试图打开它以进行阅读.

<小时>

您可以在后台线程中打开 FIFO 进行写入,并在父线程打开 FIFO 进行读取时关闭它:

#!/usr/bin/env python3导入上下文库导入操作系统导入子流程导入系统导入文本换行进口螺纹fifo_path = "fifo"使用 contextlib.ExitStack() 作为堆栈:os.mkfifo(fifo_path)stack.callback(os.remove, fifo_path)child = stack.enter_context(子进程.Popen([sys.executable, '-c', textwrap.dedent('''随机导入导入系统导入时间如果 random.random() <0.5: # 50%open(sys.argv[1], 'w').write("ok")别的:sys.exit("未打开fifo在子进程中写入")'''), fifo_path]))stack.callback(child.kill)opens = threading.Event() # 设置何时打开 FIFO 进行读取threading.Thread(target=open_for_writing, args=[fifo_path, open, child],守护进程=真).开始()pipe = stack.enter_context(open(fifo_path)) # 打开读取opens.set() # 后台线程现在可以关闭管道的末端print(pipe.read()) # 从child读取数据或3秒内返回sys.exit(child.returncode)

在 EOF 时,孩子被杀死了.

其中 open_for_writing() 打开 FIFO,以解除阻塞 open(fifo_path) 进而启用关闭它.为了避免 pipe.read() 过早返回,它给了孩子 3 秒的时间来打开 FIFO 进行写入:

def open_for_writing(path, open, child):打开(路径,'w'):opens.wait() # 在主线程打开读取之前不要关闭尝试:child.wait(timeout=3) # 孩子有 3 秒的时间打开写入除了 subprocess.TimeoutExpired:经过

如果您确定子进程要么尝试打开 FIFO 要么最终退出(或者您对 Python 进程在子进程运行时挂起没问题,那么您可以取消超时并使用 child.wait() 而不是 child.wait(timeout=3).有了这个改变,就没有任意超时了,代码可以在任意慢的系统上运行(无论出于什么原因).>

代码说明了为什么应该尽可能避免线程,或者为什么应该更喜欢已建立的模式(不太通用但保证正常工作),例如通过通信进行同步.

答案中的代码应该适用于各种情况,但各部分错综复杂.在非常具体的案例出现之前,即使是很小的变化也可能不会产生明显的影响.

Within a Python 3 web application, I need to shell out to a command line utility that processes an image, writes its output to a named pipe (fifo), and then parse that output (the content of the pipe) into a PIL/Pillow Image. Here's the basic flow (and working code so long and there are no errors!):

from os import mkfifo
from os import unlink
from PIL import Image
from subprocess import DEVNULL
from subprocess import PIPE
from subprocess import Popen

fifo_path = '/tmp/myfifo.bmp'
cmd = '/usr/bin/convert -resize 100 /path/to/some.tif ' + fifo_path
# make a named pipe
mkfifo(fifo_path)
# execute
proc = Popen(cmd, stdout=DEVNULL, stderr=PIPE, shell=True)
# parse the image
pillow_image = Image.open(fifo_path)
# finish the process:
proc_exit = proc.wait()
# remove the pipe:
unlink(fifo_path)
# just for proof:
pillow_image.show()

(I've replaced the utility I actually have to work with with ImageMagick in the example above, just because you're not likely to have it--it doesn't influence the problem at all.)

This works great in most circumstances, and I can handle most exceptions (left out above for clarity), but there's one case I can't manage to work out how to handle, which is what to do if something goes wrong in the shellout, resulting in an empty pipe e.g. if the image doesn't exist or is corrupt for some reason, e.g.:

fifo_path = '/tmp/myfifo.bmp'
cmd = '/usr/bin/convert -resize 100 /path/to/some/bad_or_missing.tif ' + fifo_path
# make a named pipe
mkfifo(fifo_path)
# execute
proc = Popen(cmd, stdout=DEVNULL, stderr=PIPE, shell=True)
# parse the image
pillow_image = Image.open(fifo_path) # STUCK
...

The application just hangs here, and because I can't get to proc_exit = proc.wait() I can't set timeout (e.g. proc_exit = proc.wait(timeout=2)), which is what I'd normally do.

I've tried wrapping the whole business in a context manager, similar to this answer, but that recipe is not thread safe, which is a problem, and I can't find a threading or multiprocessing solution that gives me access to the PIL/Pillow Image instance when I join the thread or process (not my strong suit, but something like this):

from multiprocessing import Process
from os import mkfifo
from os import unlink
from PIL import Image
from subprocess import DEVNULL
from subprocess import PIPE
from subprocess import Popen

def do_it(cmd, fifo_path):
    mkfifo(fifo_path)
    # I hear you like subprocesses with your subprocesses...
    sub_proc = Popen(cmd, stdout=DEVNULL, stderr=PIPE, shell=True)
    pillow_image = Image.open(fifo_path)
    proc_exit = sub_proc.wait()
    unlink(fifo_path)

fifo_path = '/tmp/myfifo.bmp'
cmd = '/usr/bin/convert -resize 100 /path/to/some/bad_or_missing.tif ' + fifo_path
proc = Process(target=do_it, args=(cmd, fifo_path))
proc.daemon = True
proc.start()
proc.join(timeout=3) # I can set a timeout here
# Seems heavy anyway, and how do I get pillow_image back for further work?
pillow_image.show()

Hopefully these illustrate my problem and what I've tried. Thanks in advance.

解决方案

POSIX read(2):

When attempting to read from an empty pipe or FIFO:

If no process has the pipe open for writing, read() shall return 0 to indicate end-of-file.

Image.open(fifo_path) may stuck if and only if the command dies without opening fifo_path for writing while it is blocked.

Normally, opening the FIFO blocks until the other end is opened also.

Here's a normal sequence:

  1. cmd blocks while trying to open fifo_open for writing
  2. your Python code blocks while trying to open for reading
  3. once the FIFO is opened by both processes the data flow starts. Except for the name, FIFO is similar to a pipe--there is exactly one pipe object--the kernel passes all data internally without writing it to the filesystem. The pipe is not a seekable file and therefore Image.open() may read until EOF
  4. cmd closes its end of the pipe. Your code receives EOF because no other process has the FIFO opened for writing and Image.open(fifo_path) returns.

    It doesn't matter why the cmd's end of the pipe is closed due to a successful completion or due to an error, whether cmd is killed abruptly or not: as long as its end is closed.

    It doesn't matter whether your process calls proc.wait() or not. proc.wait() does not kill the cmd. proc.wait() does not keep the other end of the pipe from being opened or closed. The only thing proc.wait() has to do is to wait until a child process dies and/or to return you the exit status of an already dead child process.

Here's the deadlock case:

  1. By the time of the Image.open() call, cmd does not even try to open fifo_open for writing for whatever reason e.g., there is no /usr/bin/convert, wrong command-line arguments, wrong/no input, etc
  2. your Python code blocks while trying to open for reading

fifo_open is not opened for writing and therefore Image.open(fifo_open) is stuck forever trying to open it for reading.


You could open the FIFO for writing in a background thread and close it when the parent opens the FIFO for reading:

#!/usr/bin/env python3
import contextlib
import os
import subprocess
import sys
import textwrap
import threading

fifo_path = "fifo"
with contextlib.ExitStack() as stack:
    os.mkfifo(fifo_path)
    stack.callback(os.remove, fifo_path)
    child = stack.enter_context(
        subprocess.Popen([
            sys.executable, '-c', textwrap.dedent('''
            import random
            import sys
            import time
            if random.random() < 0.5: # 50%
                open(sys.argv[1], 'w').write("ok")
            else:
                sys.exit("fifo is not opened for writing in the child")
            '''), fifo_path
        ]))
    stack.callback(child.kill)
    opened = threading.Event()  # set when the FIFO is opened for reading
    threading.Thread(target=open_for_writing, args=[fifo_path, opened, child],
                     daemon=True).start()
    pipe = stack.enter_context(open(fifo_path))  # open for reading
    opened.set()  # the background thread may close its end of the pipe now
    print(pipe.read()) # read data from the child or return in 3 seconds
sys.exit(child.returncode)

On EOF, the child is killed.

Where open_for_writing() opens the FIFO, to unblock open(fifo_path) that in turn enables closing it. To avoid pipe.read() returning too soon, it gives the child 3 seconds to open the FIFO for writing:

def open_for_writing(path, opened, child):
    with open(path, 'w'):
        opened.wait()  # don't close until opened for reading in the main thread
        try:
            child.wait(timeout=3)  # the child has 3 seconds to open for writing
        except subprocess.TimeoutExpired:
            pass

If you are sure that the child process is either tries to open the FIFO or exits eventually (or you are ok with the Python process hanging while the child runs then you could drop the timeout and use child.wait() instead of child.wait(timeout=3). With that change there are no arbitrary timeouts left and the code may work on an arbitrarily slow system (for whatever reason).

The code demonstrates why threads should be avoided if possible or why one should prefer established patterns (that are less general but are guaranteed to work correctly) such as synchronization via communication.

The code in the answer should work in a variety of cases but the parts are intricately tangled. The effects even of a small change might not be apparent until a very specific case materializes.

这篇关于防止从空 FIFO 读取数据阻塞的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆