在一个线程中使用Popen会阻止每一个传入的Flask-SocketIO请求 [英] Using Popen in a thread blocks every incoming Flask-SocketIO request

查看:344
本文介绍了在一个线程中使用Popen会阻止每一个传入的Flask-SocketIO请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下情况:
我在socketio服务器上收到一个请求。我回答它(socket.emit(..))然后然后在另一个线程开始一个负载较重的东西

如果大量计算是由 subprocess.Popen (使用 subprocess.PIPE )引起的,请求,只要它正在执行,尽管它发生在一个单独的线程。



没有问题 - 在这个线程它建议异步读取缓冲区大小为1的子进程的结果,以便在这些读取之间其他线程有机会做某事。不幸的是,这并没有帮助我。

我也已经 subprocess.Popen 和,就可以正常工作, subprocess.PIPE 在线程中。

在这个代码示例中,您可以看到它只发生在 subprocess.Popen subprocess.PIPE 。当取消注释 #functionWithSimulatedHeavyLoad()而改为注释 functionWithHeavyLoad() b

  from flask导入Flask 
from flask.ext.socketio导入SocketIO,发出
导入eventlet

eventlet .monkey_patch()
app = Flask(__ name__)
socketio = SocketIO(app)

从线程导入时间
线程

@ socketio.on('client command')
def response(data,type = None,nonce = None):
socketio.emit('client response',['foo'])
)thread = Thread(target = testThreadFunction)
thread.daemon = True
thread.start()
$ b $ def testThreadFunction():
#functionWithSimulatedHeavyLoad()
functionWithHeavyLoad()
$ b $ def functionWithSimulatedHeavyLoad():
time.sleep(5)

def functionWithHeavyLoad():$ b $ from datetime import datetime
导入子流程
导入sys
from queue import Queue,Empty
$ b ON_POSIX ='posix'in sys.builtin_module_names

def enqueueOutput(out,queue):
for line in iter(out.readline,b''):
if line =='':
break
queue.put(line)
out.close()

#只需要很长的时间来计算
shellCommand ='find / test'

p = subprocess.Popen(shellCommand,universal_newlines = True,shell = True,stdout = subprocess .PIPE,bufsize = 1,close_fds = ON_POSIX)
q =队列()
t =线程(target = enqueueOutput,args =(p.stdout,q))
t.daemon = True
t.start()
t.join()

text =''

而真:
尝试:
line = q.get_nowait()
text + = line
print(line)
除空:
break

socketio.emit('client response ',{' text':text})

socketio.run(app)

客户端在functionWithHeavyLoad()函数中的阻塞工作完成后收到消息'foo'。它应该早些时候收到消息。



这个示例可以被复制并粘贴到一个.py文件中,行为可以被立即复制。



我使用Python 3.4.3,Flask 0.10.1,flask-socketio1.2,eventlet 0.17.4



如果我把它放到functionWithHeavyLoad函数中,它实际上可以正常工作,一切都很好:
$
b $ b

  import shlex 
shellCommand = shlex.split('find / test')

popen = subprocess.Popen(shellCommand,stdout = subprocess.PIPE)

lines_iterator = iter(popen.stdout.readline,b)
用于line_iterator中的行:
print(line)
eventlet。 sleep()

问题是:我用 find tesseract{0}stdout -l deu 作为sell命令。这个(不像 find )仍然会阻塞一切。这是一个 tesseract 问题比eventlet?但仍然:如果这个块如果发生在一个单独的线程中,并且当 find 不会阻塞时,它通过上下文切换逐行读取,怎么能阻止?

解决方案

感谢这个问题,我今天学到了一些新东西。 Eventlet确实提供了一个友好的子程序版本及其功能,但是由于某些奇怪的原因,它并没有将这个模块修改到标准库中。



链接到eventlet实现子过程: https://github.com/eventlet/eventlet/ blob / master / eventlet / green / subprocess.py



查看eventlet patcher ,修补的模块是os,select,socket,线程,时间,MySQLdb,builtins和psycopg2。好的消息是我可以使用 Popen()
$ p $ code $ import $
$ / $ c $来自eventlet.green的


$ b

  import subprocess 

但是请注意,当前发布的eventlet版本(0.17.4)不支持 universal_newlines 选项在 Popen 中,如果你使用它,你会得到一个错误。对此选项的支持在master(这里是添加了该选项的提交)。您将不得不从您的调用中删除该选项,或者从github安装eventlet direct的主分支。


I have the following situation: I receive a request on a socketio server. I answer it (socket.emit(..)) and then start something with heavy computation load in another thread.

If the heavy computation is caused by subprocess.Popen (using subprocess.PIPE) it totally blocks every incoming request as long as it is being executed although it happens in a separate thread.

No problem - in this thread it was suggested to asynchronously read the result of the subprocess with a buffer size of 1 so that between these reads other threads have the chance to do something. Unfortunately this did not help for me.

I also already monkeypatched eventlet and that works fine - as long as I don't use subprocess.Popen with subprocess.PIPE in the thread.

In this code sample you can see that it only happens using subprocess.Popen with subprocess.PIPE. When uncommenting #functionWithSimulatedHeavyLoad() and instead comment functionWithHeavyLoad() everything works like charm.

from flask import Flask
from flask.ext.socketio import SocketIO, emit
import eventlet

eventlet.monkey_patch()
app = Flask(__name__)
socketio = SocketIO(app)

import time
from threading  import Thread

@socketio.on('client command')
def response(data, type = None, nonce = None):
    socketio.emit('client response', ['foo'])
    thread = Thread(target = testThreadFunction)
    thread.daemon = True
    thread.start()

def testThreadFunction():
    #functionWithSimulatedHeavyLoad()
    functionWithHeavyLoad()

def functionWithSimulatedHeavyLoad():
    time.sleep(5)

def functionWithHeavyLoad():
    from datetime import datetime
    import subprocess
    import sys
    from queue import Queue, Empty

    ON_POSIX = 'posix' in sys.builtin_module_names

    def enqueueOutput(out, queue):
        for line in iter(out.readline, b''):
            if line == '':
                break
            queue.put(line)
        out.close()

    # just anything that takes long to be computed
    shellCommand = 'find / test'

    p = subprocess.Popen(shellCommand, universal_newlines=True, shell=True, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX)
    q = Queue()
    t = Thread(target = enqueueOutput, args = (p.stdout, q))
    t.daemon = True
    t.start()
    t.join()

    text = ''

    while True:
        try:
            line = q.get_nowait()
            text += line
            print(line)
        except Empty:
            break

    socketio.emit('client response', {'text': text})

socketio.run(app)

The client receives the message 'foo' after the blocking work in the functionWithHeavyLoad() function is completed. It should receive the message earlier, though.

This sample can be copied and pasted in a .py file and the behavior can be instantly reproduced.

I am using Python 3.4.3, Flask 0.10.1, flask-socketio1.2, eventlet 0.17.4

Update

If I put this into the functionWithHeavyLoad function it actually works and everything's fine:

import shlex
shellCommand = shlex.split('find / test')

popen = subprocess.Popen(shellCommand, stdout=subprocess.PIPE)

lines_iterator = iter(popen.stdout.readline, b"")
for line in lines_iterator:
    print(line)
    eventlet.sleep()

The problem is: I used find for heavy load in order to make the sample for you more easily reproducable. However, in my code I actually use tesseract "{0}" stdout -l deu as the sell command. This (unlike find) still blocks everything. Is this rather a tesseract issue than eventlet? But still: how can this block if it happens in a separate thread where it reads line by line with context switch when find does not block?

解决方案

Thanks to this question I learned something new today. Eventlet does offer a greenlet friendly version of subprocess and its functions, but for some odd reason it does not monkey patch this module in the standard library.

Link to the eventlet implementation of subprocess: https://github.com/eventlet/eventlet/blob/master/eventlet/green/subprocess.py

Looking at the eventlet patcher, the modules that are patched are os, select, socket, thread, time, MySQLdb, builtins and psycopg2. There is absolutely no reference to subprocess in the patcher.

The good news is that I was able to work with Popen() in an application very similar to yours, after I replaced:

import subprocess

with:

from eventlet.green import subprocess

But note that the currently released version of eventlet (0.17.4) does not support the universal_newlines option in Popen, you will get an error if you use it. Support for this option is in master (here is the commit that added the option). You will either have to remove that option from your call, or else install the master branch of eventlet direct from github.

这篇关于在一个线程中使用Popen会阻止每一个传入的Flask-SocketIO请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆