块-将输入发送到python子流程管道 [英] blocks - send input to python subprocess pipeline

查看:70
本文介绍了块-将输入发送到python子流程管道的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在用python测试子流程管道.我知道我可以直接在python中执行以下程序,但这不是重点.我只想测试管道,以便知道如何使用它.

我的系统是带有默认python 2.6的Linux Ubuntu 9.04.

我从这个文档示例开始.

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

可以,但是由于p1stdin未被重定向,因此我必须在终端中键入内容以填充管道.键入^D关闭stdin时,得到所需的输出.

但是,我想使用python字符串变量将数据发送到管道.首先,我尝试在stdin上书写:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here

没用.我尝试在最后一行使用p2.stdout.read()代替,但是它也会阻塞.我添加了p1.stdin.flush()p1.stdin.close(),但是它也不起作用.我然后我开始交流:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0] 

那还不是.

我注意到运行单个进程(如上面的p1,删除p2)效果很好.并且将文件句柄传递给p1(stdin=open(...))也可以.所以问题是:

是否可以在不阻塞的情况下将数据传递到python中2个或更多子进程的管道中?为什么不呢?

我知道我可以运行一个shell并在shell中运行管道,但这不是我想要的.


更新1 :按照下面的亚伦·迪古拉(Aaron Digulla)的提示,我现在正尝试使用线程来使其正常工作.

首先,我尝试在线程上运行p1.communicate.

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here

好的,没用.尝试了其他组合,例如将其更改为.write()以及p2.read().没有.现在让我们尝试相反的方法:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()

代码最终在某处阻塞.在派生线程中,还是在主线程中,或者两者都存在.所以它没有用.如果您知道如何使它工作,那么可以提供工作代码会更容易.我在这里尝试.


更新2

Paul Du Bois在下面提供了一些信息,所以我进行了更多测试. 我已经阅读了整个subprocess.py模块,并了解了它的工作原理.因此,我尝试将其完全应用于代码.

我在Linux上,但是由于我正在使用线程进行测试,因此我的第一种方法是复制在subprocess.pycommunicate()方法上看到的确切的Windows线程代码,但是要复制两个进程而不是一个进程.这是我尝试过的全部清单:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

好吧.没用即使在调用p1.stdin.close()之后,p2.stdout.read()仍然会阻塞.

然后我在subprocess.py上尝试了posix代码:

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

也在select.select()上阻止.通过在周围散布print,我发现了这一点:

  • 正在阅读.代码在执行过程中被读取多次.
  • 写作也可以.数据写入p1.stdin.
  • numwrites的结尾处,调用了p1.stdin.close().
  • select()开始阻止时,只有to_read有东西,p2.stdout. to_write已经为空.
  • os.read()调用总是返回某些内容,因此永远不会调用p2.stdout.close().

两个测试的结论:关闭管道上第一个进程的stdin(在示例中为grep)并不会使它的缓冲输出转储到下一个并死亡. /p>

无法使其正常工作吗?

PS:我不想使用临时文件,我已经对文件进行了测试,并且知道它可以工作.而且我不想使用Windows.

解决方案

我发现了如何做.

这与线程无关,与select()无关.

当我运行第一个进程(grep)时,它将创建两个低级文件描述符,每个管道一个.让我们叫那些ab.

当我运行第二个进程时,b被传递给cut sdtin.但是Popen-close_fds=False上有一个死脑筋的默认设置.

这样做的结果是cut也继承了a.因此,即使我关闭agrep也不会死,因为stdin在cut的进程中仍处于打开状态(cut会忽略它).

以下代码现在可以完美运行.

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

在UNIX系统上,

close_fds=True应该是默认.在Windows上,它会关闭 all fds,因此可以防止管道传输.

PS:对于有类似问题的人,请阅读以下答案:pooryorick在评论中说,如果写入p1.stdin的数据大于缓冲区,这也可能会阻塞.在这种情况下,您应该将数据分成较小的块,并使用select.select()知道何时进行读取/写入.问题中的代码应提示如何实现.

找到了另一个解决方案,在pooryorick的更多帮助下-代替使用close_fds=True并关闭 ALL fds,可以在执行时关闭属于第一个进程的fd第二,它会工作.关闭必须在子进程中完成,因此Popen的preexec_fn函数非常方便地执行此操作.在执行p2时,您可以执行以下操作:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)

I'm testing subprocesses pipelines with python. I'm aware that I can do what the programs below do in python directly, but that's not the point. I just want to test the pipeline so I know how to use it.

My system is Linux Ubuntu 9.04 with default python 2.6.

I started with this documentation example.

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

That works, but since p1's stdin is not being redirected, I have to type stuff in the terminal to feed the pipe. When I type ^D closing stdin, I get the output I want.

However, I want to send data to the pipe using a python string variable. First I tried writing on stdin:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here

Didn't work. I tried using p2.stdout.read() instead on last line, but it also blocks. I added p1.stdin.flush() and p1.stdin.close() but it didn't work either. I Then I moved to communicate:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0] 

So that's still not it.

I noticed that running a single process (like p1 above, removing p2) works perfectly. And passing a file handle to p1 (stdin=open(...)) also works. So the problem is:

Is it possible to pass data to a pipeline of 2 or more subprocesses in python, without blocking? Why not?

I'm aware I could run a shell and run the pipeline in the shell, but that's not what I want.


UPDATE 1: Following Aaron Digulla's hint below I'm now trying to use threads to make it work.

First I've tried running p1.communicate on a thread.

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here

Okay, didn't work. Tried other combinations like changing it to .write() and also p2.read(). Nothing. Now let's try the opposite approach:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()

code ends up blocking somewhere. Either in the spawned thread, or in the main thread, or both. So it didn't work. If you know how to make it work it would make easier if you can provide working code. I'm trying here.


UPDATE 2

Paul Du Bois answered below with some information, so I did more tests. I've read entire subprocess.py module and got how it works. So I tried applying exactly that to code.

I'm on linux, but since I was testing with threads, my first approach was to replicate the exact windows threading code seen on subprocess.py's communicate() method, but for two processes instead of one. Here's the entire listing of what I tried:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

Well. It didn't work. Even after p1.stdin.close() was called, p2.stdout.read() still blocks.

Then I tried the posix code on subprocess.py:

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

Also blocks on select.select(). By spreading prints around, I found out this:

  • Reading is working. Code reads many times during execution.
  • Writing is also working. Data is written to p1.stdin.
  • At the end of numwrites, p1.stdin.close() is called.
  • When select() starts blocking, only to_read has something, p2.stdout. to_write is already empty.
  • os.read() call always returns something, so p2.stdout.close() is never called.

Conclusion from both tests: Closing the stdin of the first process on the pipeline (grep in the example) is not making it dump its buffered output to the next and die.

No way to make it work?

PS: I don't want to use a temporary file, I've already tested with files and I know it works. And I don't want to use windows.

解决方案

I found out how to do it.

It is not about threads, and not about select().

When I run the first process (grep), it creates two low-level file descriptors, one for each pipe. Lets call those a and b.

When I run the second process, b gets passed to cut sdtin. But there is a brain-dead default on Popen - close_fds=False.

The effect of that is that cut also inherits a. So grep can't die even if I close a, because stdin is still open on cut's process (cut ignores it).

The following code now runs perfectly.

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds=True SHOULD BE THE DEFAULT on unix systems. On windows it closes all fds, so it prevents piping.

EDIT:

PS: For people with a similar problem reading this answer: As pooryorick said in a comment, that also could block if data written to p1.stdin is bigger than the buffers. In that case you should chunk the data into smaller pieces, and use select.select() to know when to read/write. The code in the question should give a hint on how to implement that.

EDIT2: Found another solution, with more help from pooryorick - instead of using close_fds=True and close ALL fds, one could close the fds that belongs to the first process, when executing the second, and it will work. The closing must be done in the child so the preexec_fn function from Popen comes very handy to do just that. On executing p2 you can do:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)

这篇关于块-将输入发送到python子流程管道的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆