如何在Python 2.7中线程化多个子流程实例? [英] How to thread multiple subprocess instances in Python 2.7?
问题描述
我有三个命令,否则它们很容易在命令行上链接在一起,就像这样:
I have three commands that would otherwise be easily chained together on the command-line like so:
$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput
换句话说,firstCommand
从标准输入处理foo
并将结果通过管道传输到secondCommand
,后者又处理该输入并将其输出管道通过thirdCommand
进行处理并将其输出重定向到文件finalOutput
.
In other words, the firstCommand
processes foo
from standard input and pipes the result to secondCommand
, which in turn processes that input and pipes its output to thirdCommand
, which does processing and redirects its output to the file finalOutput
.
我一直在尝试使用线程在Python脚本中进行概括.我想使用Python,以便在将firstCommand
的输出传递到secondCommand
之前,以及再次在secondCommand
和thirdCommand
之间进行操作.
I have been trying to recapitulate this in a Python script, using threading. I'd like to use Python in order to manipulate the output from firstCommand
before passing it to secondCommand
, and again between secondCommand
and thirdCommand
.
这是一段似乎无效的代码摘录:
Here's an excerpt of code that does not seem to work:
first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)
first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))
first_thread.start()
second_thread.start()
third_thread.start()
first_thread.join()
second_thread.join()
third_thread.join()
first_process.communicate()
second_process.communicate()
third_process.communicate()
# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
chunk = from_stream.read(1024)
while chunk:
to_stream.write(chunk)
to_stream.flush()
chunk = from_stream.read(1024)
def consumeOutputFromFirstCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
break
processed_line = some_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
def consumeOutputFromSecondCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
break
processed_line = a_different_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
运行此命令时,脚本挂起:
When I run this, the script hangs:
$ echo foo | ./myConversionScript.py
** hangs here... **
如果我按Ctrl-C
键终止脚本,则代码将停留在third_thread.join()
行:
If I hit Ctrl-C
to terminate the script, the code is stuck on the line third_thread.join()
:
C-c C-c
Traceback (most recent call last):
File "./myConversionScript.py", line 786, in <module>
sys.exit(main(*sys.argv))
File "./myConversionScript.py", line 556, in main
third_thread.join()
File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join
self.__block.wait()
File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait
waiter.acquire()
KeyboardInterrupt
如果我不使用third_process
和third_thread
,而是仅将数据从第一个线程的输出传递到第二个线程的输入,则不会挂起.
If I don't use a third_process
and third_thread
, instead only passing data from the output of the first thread to the input of the second thread, there is no hang.
关于第三个线程的某些事情似乎会导致事情中断,但我不知道为什么.
Something about the third thread seems to cause things to break, but I don't know why.
我认为communicate()
的要点是它将处理三个进程的I/O,所以我不确定为什么会有I/O挂起.
I thought the point of communicate()
is that it will handle I/O for the three processes, so I'm not sure why there is an I/O hang.
如何使三个或更多命令/进程协同工作,其中一个线程消耗了另一个线程/进程的输出?
How do I get three or more commands/processes working together, where one thread consumes the output of another thread/process?
更新
好的,根据此处和其他网站上的评论,我进行了一些更改,这些更改似乎有所帮助.这些过程将完成wait()
的工作,并且在线程方法中,一旦线程已处理了所有可以处理的数据,便在管道中close()
进行处理.我担心的是大型数据集的内存使用率会很高,但至少可以正常工作:
Okay, I made some changes that seem to help, based on some comments here and on other sites. The processes are made to wait()
for completion, and within the thread methods, I close()
the pipes once the thread has processed all the data that it can. My concern is that memory usage will be very high for large datasets, but at least things are working:
first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)
first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))
first_thread.start()
second_thread.start()
third_thread.start()
first_thread.join()
second_thread.join()
third_thread.join()
first_process.wait()
second_process.wait()
third_process.wait()
# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
chunk = from_stream.read(1024)
while chunk:
to_stream.write(chunk)
to_stream.flush()
chunk = from_stream.read(1024)
def consumeOutputFromFirstCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
from_stream.close()
to_stream.close()
break
processed_line = some_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
def consumeOutputFromSecondCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
from_stream.close()
to_stream.close()
break
processed_line = a_different_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
推荐答案
要模拟:
echo foo |
firstCommand - | somePythonRoutine - |
secondCommand - | anotherPythonRoutine - |
thirdCommand - > finalOutput
您当前使用线程的方法有效:
your current approach with threads works:
from subprocess import Popen, PIPE
first = Popen(["firstCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1)
second = Popen(["secondCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1)
bind(first.stdout, second.stdin, somePythonRoutine)
with open("finalOutput", "wb") as file:
third = Popen(["thirdCommand", "-"], stdin=PIPE, stdout=file, bufsize=1)
bind(second.stdout, third.stdin, anotherPythonRoutine)
# provide input for the pipeline
first.stdin.write(b"foo")
first.stdin.close()
# wait for it to complete
pipestatus = [p.wait() for p in [first, second, third]]
每个bind()
都会启动一个新线程:
where each bind()
starts a new thread:
from threading import Thread
def bind(input_pipe, output_pipe, line_filter):
def f():
try:
for line in iter(input_pipe.readline, b''):
line = line_filter(line)
if line:
output_pipe.write(line) # no flush unless newline present
finally:
try:
output_pipe.close()
finally:
input_pipe.close()
t = Thread(target=f)
t.daemon = True # die if the program exits
t.start()
和somePythonRoutine
,anotherPythonRoutine
接受一行并返回(可能已修改).
and somePythonRoutine
, anotherPythonRoutine
accept a single line and return it (possibly modified).
这篇关于如何在Python 2.7中线程化多个子流程实例?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!