块-将输入发送到python子流程管道 [英] blocks - send input to python subprocess pipeline
问题描述
我正在用python测试子流程管道.我知道我可以直接在python中执行以下程序,但这不是重点.我只想测试管道,以便知道如何使用它.
我的系统是带有默认python 2.6的Linux Ubuntu 9.04.
我从这个文档示例开始.
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output
可以,但是由于p1
的stdin
未被重定向,因此我必须在终端中键入内容以填充管道.键入^D
关闭stdin时,得到所需的输出.
但是,我想使用python字符串变量将数据发送到管道.首先,我尝试在stdin上书写:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here
没用.我尝试在最后一行使用p2.stdout.read()
代替,但是它也会阻塞.我添加了p1.stdin.flush()
和p1.stdin.close()
,但是它也不起作用.我然后我开始交流:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0]
那还不是.
我注意到运行单个进程(如上面的p1
,删除p2
)效果很好.并且将文件句柄传递给p1
(stdin=open(...)
)也可以.所以问题是:
是否可以在不阻塞的情况下将数据传递到python中2个或更多子进程的管道中?为什么不呢?
我知道我可以运行一个shell并在shell中运行管道,但这不是我想要的.
更新1 :按照下面的亚伦·迪古拉(Aaron Digulla)的提示,我现在正尝试使用线程来使其正常工作.
首先,我尝试在线程上运行p1.communicate.
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here
好的,没用.尝试了其他组合,例如将其更改为.write()
以及p2.read()
.没有.现在让我们尝试相反的方法:
def get_output(subp):
output = subp.communicate()[0] # blocks on thread
print 'GOT:', output
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,))
t.start()
p1.communicate('data\n') # blocks here.
t.join()
代码最终在某处阻塞.在派生线程中,还是在主线程中,或者两者都存在.所以它没有用.如果您知道如何使它工作,那么可以提供工作代码会更容易.我在这里尝试.
更新2
Paul Du Bois在下面提供了一些信息,所以我进行了更多测试.
我已经阅读了整个subprocess.py
模块,并了解了它的工作原理.因此,我尝试将其完全应用于代码.
我在Linux上,但是由于我正在使用线程进行测试,因此我的第一种方法是复制在subprocess.py
的communicate()
方法上看到的确切的Windows线程代码,但是要复制两个进程而不是一个进程.这是我尝试过的全部清单:
import os
from subprocess import Popen, PIPE
import threading
def get_output(fobj, buffer):
while True:
chunk = fobj.read() # BLOCKS HERE
if not chunk:
break
buffer.append(chunk)
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread
for x in xrange(100000):
p1.stdin.write('hello world\n') # write data
p1.stdin.flush()
p1.stdin.close() # close input...
t.join()
好吧.没用即使在调用p1.stdin.close()
之后,p2.stdout.read()
仍然会阻塞.
然后我在subprocess.py
上尝试了posix代码:
import os
from subprocess import Popen, PIPE
import select
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
while to_read or to_write:
read_now, write_now, xlist = select.select(to_read, to_write, [])
if read_now:
data = os.read(p2.stdout.fileno(), 1024)
if not data:
p2.stdout.close()
to_read = []
else:
b.append(data)
if write_now:
if numwrites > 0:
numwrites -= 1
p1.stdin.write('hello world!\n'); p1.stdin.flush()
else:
p1.stdin.close()
to_write = []
print b
也在select.select()
上阻止.通过在周围散布print
,我发现了这一点:
- 正在阅读.代码在执行过程中被读取多次.
- 写作也可以.数据写入
p1.stdin
. - 在
numwrites
的结尾处,调用了p1.stdin.close()
. - 当
select()
开始阻止时,只有to_read
有东西,p2.stdout
.to_write
已经为空. -
os.read()
调用总是返回某些内容,因此永远不会调用p2.stdout.close()
.
两个测试的结论:关闭管道上第一个进程的stdin
(在示例中为grep
)并不会使它的缓冲输出转储到下一个并死亡. /p>
无法使其正常工作吗?
PS:我不想使用临时文件,我已经对文件进行了测试,并且知道它可以工作.而且我不想使用Windows.
我发现了如何做.
这与线程无关,与select()无关.
当我运行第一个进程(grep
)时,它将创建两个低级文件描述符,每个管道一个.让我们叫那些a
和b
.
当我运行第二个进程时,b
被传递给cut
sdtin
.但是Popen
-close_fds=False
上有一个死脑筋的默认设置.
这样做的结果是cut
也继承了a
.因此,即使我关闭a
,grep
也不会死,因为stdin在cut
的进程中仍处于打开状态(cut
会忽略它).
以下代码现在可以完美运行.
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read()
assert result == "Hello Worl\n"
在UNIX系统上,
close_fds=True
应该是默认.在Windows上,它会关闭 all fds,因此可以防止管道传输.
PS:对于有类似问题的人,请阅读以下答案:pooryorick在评论中说,如果写入p1.stdin
的数据大于缓冲区,这也可能会阻塞.在这种情况下,您应该将数据分成较小的块,并使用select.select()
知道何时进行读取/写入.问题中的代码应提示如何实现.
找到了另一个解决方案,在pooryorick的更多帮助下-代替使用close_fds=True
并关闭 ALL fds,可以在执行时关闭属于第一个进程的fd
第二,它会工作.关闭必须在子进程中完成,因此Popen的preexec_fn
函数非常方便地执行此操作.在执行p2时,您可以执行以下操作:
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)
I'm testing subprocesses pipelines with python. I'm aware that I can do what the programs below do in python directly, but that's not the point. I just want to test the pipeline so I know how to use it.
My system is Linux Ubuntu 9.04 with default python 2.6.
I started with this documentation example.
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output
That works, but since p1
's stdin
is not being redirected, I have to type stuff in the terminal to feed the pipe. When I type ^D
closing stdin, I get the output I want.
However, I want to send data to the pipe using a python string variable. First I tried writing on stdin:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here
Didn't work. I tried using p2.stdout.read()
instead on last line, but it also blocks. I added p1.stdin.flush()
and p1.stdin.close()
but it didn't work either. I Then I moved to communicate:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0]
So that's still not it.
I noticed that running a single process (like p1
above, removing p2
) works perfectly. And passing a file handle to p1
(stdin=open(...)
) also works. So the problem is:
Is it possible to pass data to a pipeline of 2 or more subprocesses in python, without blocking? Why not?
I'm aware I could run a shell and run the pipeline in the shell, but that's not what I want.
UPDATE 1: Following Aaron Digulla's hint below I'm now trying to use threads to make it work.
First I've tried running p1.communicate on a thread.
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here
Okay, didn't work. Tried other combinations like changing it to .write()
and also p2.read()
. Nothing. Now let's try the opposite approach:
def get_output(subp):
output = subp.communicate()[0] # blocks on thread
print 'GOT:', output
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,))
t.start()
p1.communicate('data\n') # blocks here.
t.join()
code ends up blocking somewhere. Either in the spawned thread, or in the main thread, or both. So it didn't work. If you know how to make it work it would make easier if you can provide working code. I'm trying here.
UPDATE 2
Paul Du Bois answered below with some information, so I did more tests.
I've read entire subprocess.py
module and got how it works. So I tried applying exactly that to code.
I'm on linux, but since I was testing with threads, my first approach was to replicate the exact windows threading code seen on subprocess.py
's communicate()
method, but for two processes instead of one. Here's the entire listing of what I tried:
import os
from subprocess import Popen, PIPE
import threading
def get_output(fobj, buffer):
while True:
chunk = fobj.read() # BLOCKS HERE
if not chunk:
break
buffer.append(chunk)
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread
for x in xrange(100000):
p1.stdin.write('hello world\n') # write data
p1.stdin.flush()
p1.stdin.close() # close input...
t.join()
Well. It didn't work. Even after p1.stdin.close()
was called, p2.stdout.read()
still blocks.
Then I tried the posix code on subprocess.py
:
import os
from subprocess import Popen, PIPE
import select
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
while to_read or to_write:
read_now, write_now, xlist = select.select(to_read, to_write, [])
if read_now:
data = os.read(p2.stdout.fileno(), 1024)
if not data:
p2.stdout.close()
to_read = []
else:
b.append(data)
if write_now:
if numwrites > 0:
numwrites -= 1
p1.stdin.write('hello world!\n'); p1.stdin.flush()
else:
p1.stdin.close()
to_write = []
print b
Also blocks on select.select()
. By spreading print
s around, I found out this:
- Reading is working. Code reads many times during execution.
- Writing is also working. Data is written to
p1.stdin
. - At the end of
numwrites
,p1.stdin.close()
is called. - When
select()
starts blocking, onlyto_read
has something,p2.stdout
.to_write
is already empty. os.read()
call always returns something, sop2.stdout.close()
is never called.
Conclusion from both tests: Closing the stdin
of the first process on the pipeline (grep
in the example) is not making it dump its buffered output to the next and die.
No way to make it work?
PS: I don't want to use a temporary file, I've already tested with files and I know it works. And I don't want to use windows.
I found out how to do it.
It is not about threads, and not about select().
When I run the first process (grep
), it creates two low-level file descriptors, one for each pipe. Lets call those a
and b
.
When I run the second process, b
gets passed to cut
sdtin
. But there is a brain-dead default on Popen
- close_fds=False
.
The effect of that is that cut
also inherits a
. So grep
can't die even if I close a
, because stdin is still open on cut
's process (cut
ignores it).
The following code now runs perfectly.
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read()
assert result == "Hello Worl\n"
close_fds=True
SHOULD BE THE DEFAULT on unix systems. On windows it closes all fds, so it prevents piping.
EDIT:
PS: For people with a similar problem reading this answer: As pooryorick said in a comment, that also could block if data written to p1.stdin
is bigger than the buffers. In that case you should chunk the data into smaller pieces, and use select.select()
to know when to read/write. The code in the question should give a hint on how to implement that.
EDIT2: Found another solution, with more help from pooryorick - instead of using close_fds=True
and close ALL fds, one could close the fd
s that belongs to the first process, when executing the second, and it will work. The closing must be done in the child so the preexec_fn
function from Popen comes very handy to do just that. On executing p2 you can do:
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)
这篇关于块-将输入发送到python子流程管道的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!