Subprocess.Popen:将 stdout 和 stderr 克隆到终端和变量 [英] Subprocess.Popen: cloning stdout and stderr both to terminal and variables
问题描述
是否可以修改下面的代码以从'stdout'和'stderr'打印输出:
Is it possible to modify code below to have printout from 'stdout 'and 'stderr':
- 打印在终端(实时),
- 最后存储在 outs 和 errs 变量中?
- printed on the terminal (in real time),
- and finally stored in outs and errs variables?
代码:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import subprocess
def run_cmd(command, cwd=None):
p = subprocess.Popen(command, cwd=cwd, shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
outs, errs = p.communicate()
rc = p.returncode
outs = outs.decode('utf-8')
errs = errs.decode('utf-8')
return (rc, (outs, errs))
感谢@unutbu,特别感谢@j-f-sebastian,最终功能:
Thanks to @unutbu, special thanks for @j-f-sebastian, final function:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import sys
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread
def read_output(pipe, funcs):
for line in iter(pipe.readline, b''):
for func in funcs:
func(line.decode('utf-8'))
pipe.close()
def write_output(get):
for line in iter(get, None):
sys.stdout.write(line)
def run_cmd(command, cwd=None, passthrough=True):
outs, errs = None, None
proc = Popen(
command,
cwd=cwd,
shell=False,
close_fds=True,
stdout=PIPE,
stderr=PIPE,
bufsize=1
)
if passthrough:
outs, errs = [], []
q = Queue()
stdout_thread = Thread(
target=read_output, args=(proc.stdout, [q.put, outs.append])
)
stderr_thread = Thread(
target=read_output, args=(proc.stderr, [q.put, errs.append])
)
writer_thread = Thread(
target=write_output, args=(q.get,)
)
for t in (stdout_thread, stderr_thread, writer_thread):
t.daemon = True
t.start()
proc.wait()
for t in (stdout_thread, stderr_thread):
t.join()
q.put(None)
outs = ' '.join(outs)
errs = ' '.join(errs)
else:
outs, errs = proc.communicate()
outs = '' if outs == None else outs.decode('utf-8')
errs = '' if errs == None else errs.decode('utf-8')
rc = proc.returncode
return (rc, (outs, errs))
推荐答案
您可以生成线程来读取 stdout 和 stderr 管道、写入公共队列并附加到列表.然后使用第三个线程打印队列中的项目.
You could spawn threads to read the stdout and stderr pipes, write to a common queue, and append to lists. Then use a third thread to print items from the queue.
import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE
def read_output(pipe, funcs):
for line in iter(pipe.readline, ''):
for func in funcs:
func(line)
# time.sleep(1)
pipe.close()
def write_output(get):
for line in iter(get, None):
sys.stdout.write(line)
process = subprocess.Popen(
['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True, bufsize=1)
q = Queue.Queue()
out, err = [], []
tout = threading.Thread(
target=read_output, args=(process.stdout, [q.put, out.append]))
terr = threading.Thread(
target=read_output, args=(process.stderr, [q.put, err.append]))
twrite = threading.Thread(target=write_output, args=(q.get,))
for t in (tout, terr, twrite):
t.daemon = True
t.start()
process.wait()
for t in (tout, terr):
t.join()
q.put(None)
print(out)
print(err)
使用第三个线程的原因——而不是让前两个线程都直接打印到终端——是为了防止两个打印语句同时发生,这有时会导致文本乱码.
The reason for using the third thread -- instead of letting the first two threads both print directly to the terminal -- is to prevent both print statements from occurring concurrently, which can result in sometimes garbled text.
上面调用了random_print.py
,它随机打印到stdout和stderr:
The above calls random_print.py
, which prints to stdout and stderr at random:
import sys
import time
import random
for i in range(50):
f = random.choice([sys.stdout,sys.stderr])
f.write(str(i)+'
')
f.flush()
time.sleep(0.1)
<小时>
该解决方案借鉴了 J. 的代码和想法.F. 塞巴斯蒂安,在这里.
这里是类 Unix 系统的替代解决方案,使用 select.select
:
Here is an alternative solution for Unix-like systems, using select.select
:
import collections
import select
import fcntl
import os
import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE
def make_async(fd):
# https://stackoverflow.com/a/7730201/190597
'''add the O_NONBLOCK flag to a file descriptor'''
fcntl.fcntl(
fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
def read_async(fd):
# https://stackoverflow.com/a/7730201/190597
'''read some data from a file descriptor, ignoring EAGAIN errors'''
# time.sleep(1)
try:
return fd.read()
except IOError, e:
if e.errno != errno.EAGAIN:
raise e
else:
return ''
def write_output(fds, outmap):
for fd in fds:
line = read_async(fd)
sys.stdout.write(line)
outmap[fd.fileno()].append(line)
process = subprocess.Popen(
['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True)
make_async(process.stdout)
make_async(process.stderr)
outmap = collections.defaultdict(list)
while True:
rlist, wlist, xlist = select.select([process.stdout, process.stderr], [], [])
write_output(rlist, outmap)
if process.poll() is not None:
write_output([process.stdout, process.stderr], outmap)
break
fileno = {'stdout': process.stdout.fileno(),
'stderr': process.stderr.fileno()}
print(outmap[fileno['stdout']])
print(outmap[fileno['stderr']])
此解决方案使用来自Adam Rosenfield 的帖子,此处的代码和想法.
This solution uses code and ideas from Adam Rosenfield's post, here.
这篇关于Subprocess.Popen:将 stdout 和 stderr 克隆到终端和变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!