如何在 PyQt5 中排队 QProcesses? [英] How do I queue QProcesses in PyQt5?
问题描述
我想在 PyQt5 中排队 QProcess 或者只是阻塞,同时仍然使用 readAll() 读取标准输出.相当于 subprocess.call 而不是 subprocess.Pop.使用 waitForFinished() 时,带有 readAll() 的 stdout 将在进程结束时立即出现,而不是在处理过程中流出.
I want to queue QProcess in PyQt5 or simply block while still reading the stdout with readAll(). The equivalent of subprocess.call instead of subprocess.Pop. When using waitForFinished() the stdout with readAll() will all come at once when the process is ended instead of flowing out while it's processing.
示例脚本:
from PIL import Image
import numpy as np
import sys
from PyQt5 import QtGui,QtCore, QtWidgets
class gui(QtWidgets.QMainWindow):
def __init__(self):
super(gui, self).__init__()
self.initUI()
def dataReady(self):
cursor = self.output.textCursor()
cursor.movePosition(cursor.End)
cursor.insertText(str(self.process.readAll(), "utf-8"))
self.output.ensureCursorVisible()
def callProgram(self):
# run the process
# `start` takes the exec and a list of argument
filepath = 'path\image.tif'
self.process.start('some_command filepath'])
# This will output a file image.tif specified by filepath:
# Import file and do stuff to it:
# E.g.
im = Image.open('filepath')
imarray = np.array(im)
# Get image extents as argument to next process:
ext = str(imarray.size)
imarray[imarray == 10] = 5
# Save changes
im = Image.fromarray(imarray)
im.save(filepath)
# Now the image has been updated and should be in a new process below
cmd = 'some_other_command' + filepath + ext
self.process.start(cmd)
# Same thing goes on here:
self.process.start('some_command filepath')
# Import file once again
im = Image.open('filepath')
imarray[imarray == 10] = 5
# Save changes
im = Image.fromarray(imarray)
im.save(filepath)
def initUI(self):
layout = QtWidgets.QHBoxLayout()
self.runButton = QtWidgets.QPushButton('Run')
self.runButton.clicked.connect(self.callProgram)
self.output = QtWidgets.QTextEdit()
layout.addWidget(self.output)
layout.addWidget(self.runButton)
centralWidget = QtWidgets.QWidget()
centralWidget.setLayout(layout)
self.setCentralWidget(centralWidget)
# QProcess object for external app
self.process = QtCore.QProcess(self)
# QProcess emits `readyRead` when there is data to be read
self.process.readyRead.connect(self.dataReady)
# Just to prevent accidentally running multiple times
# Disable the button when process starts, and enable it when it finishes
self.process.started.connect(lambda: self.runButton.setEnabled(False))
self.process.finished.connect(lambda: self.runButton.setEnabled(True))
#Function Main Start
def main():
app = QtWidgets.QApplication(sys.argv)
ui=gui()
ui.show()
sys.exit(app.exec_())
#Function Main END
if __name__ == '__main__':
main()
推荐答案
这种情况下的解决方案是创建一个 TaskManager
类,负责处理任务之间的顺序性.
The solution in this case is to create a TaskManager
class that is responsible for handling the sequentiality between the tasks.
import sys
from PyQt5 import QtCore, QtWidgets
from functools import partial
class TaskManager(QtCore.QObject):
started = QtCore.pyqtSignal()
finished = QtCore.pyqtSignal()
progressChanged = QtCore.pyqtSignal(int, QtCore.QByteArray)
def __init__(self, parent=None):
QtCore.QObject.__init__(self, parent)
self._process = QtCore.QProcess(self)
self._process.finished.connect(self.handleFinished)
self._progress = 0
def start_tasks(self, tasks):
self._tasks = iter(tasks)
self.fetchNext()
self.started.emit()
self._progress = 0
def fetchNext(self):
try:
task = next(self._tasks)
except StopIteration:
return False
else:
self._process.start(*task)
return True
def processCurrentTask(self):
output = self._process.readAllStandardOutput()
self._progress += 1
self.progressChanged.emit(self._progress, output)
def handleFinished(self):
self.processCurrentTask()
if not self.fetchNext():
self.finished.emit()
class gui(QtWidgets.QMainWindow):
def __init__(self):
super(gui, self).__init__()
self.initUI()
def dataReady(self, progress, result):
self.output.append(str(result, "utf-8"))
self.progressBar.setValue(progress)
def callProgram(self):
tasks = [("ping", ["8.8.8.8"]),
("ping", ["8.8.8.8"]),
("ping", ["8.8.8.8"])]
self.progressBar.setMaximum(len(tasks))
self.manager.start_tasks(tasks)
def initUI(self):
layout = QtWidgets.QVBoxLayout()
self.runButton = QtWidgets.QPushButton('Run')
self.runButton.clicked.connect(self.callProgram)
self.output = QtWidgets.QTextEdit()
self.progressBar = QtWidgets.QProgressBar()
layout.addWidget(self.output)
layout.addWidget(self.runButton)
layout.addWidget(self.progressBar)
centralWidget = QtWidgets.QWidget()
centralWidget.setLayout(layout)
self.setCentralWidget(centralWidget)
self.manager = TaskManager(self)
self.manager.progressChanged.connect(self.dataReady)
self.manager.started.connect(partial(self.runButton.setEnabled, False))
self.manager.finished.connect(partial(self.runButton.setEnabled, True))
def main():
app = QtWidgets.QApplication(sys.argv)
ui=gui()
ui.show()
sys.exit(app.exec_())
if __name__ == '__main__':
main()
<小时>
更新:
概括这个问题,可以说第n个进程需要默认参数和附加参数.
Generalizing the problem, it can be said that the n-th process needs default arguments and additional arguments.
默认参数是独立且固定的
The default arguments are independent and fixed
附加参数通过某个函数依赖于前面的过程.
The additional arguments depend on the previous process through a certain function.
因此可以使用以下表达式进行概括:
So it can be generalized using the following expressions:
result_n = process_n(default_arguments, additional_args_n)`
additional_args_n = fun_n(result_n-1)`
或使用下图:
________ _________ ________ _________ ________
| | | | | | | | | |
| | | | | | | | | |
| TASK-1 |--->| FUN1TO2 |--->| TASK-2 |--->| FUN2TO3 |--->| TASK-3 |
| | | | | | | | | |
|________| |_________| |________| |_________| |________|
然后为了构建过程,创建了以下字典:
Then to structure the process the following dictionary is created:
task_n = {"program": program, "args": default_arguments, "function": fun}
其中 fun
是用于处理此任务的输出以获得下一个任务的附加参数的函数.
Where fun
is the function used to process the output of this task to obtain additional arguments for the next task.
在下面的示例中,我将使用 scriptX.py
作为程序而不是 ping
.
In the following example, I will use scriptX.py
as a program instead of ping
.
#script1.py
import sys
def foo(*args):
v, = args
return "1-"+"".join(v)
arg = sys.argv[1:]
print(foo(arg))
#script2.py
import sys
def foo(*args):
v, = args
return "2-"+"".join(v)
arg = sys.argv[1:]
print(foo(arg))
#script3.py
import sys
def foo(*args):
v, = args
return "3-"+"".join(v)
arg = sys.argv[1:]
print(foo(arg))
fun1to2
是使用 process-1 的结果生成 process-2 所需的附加参数并且必须返回它的函数.fun2to3
fun1to2
is the function that uses the result of process-1 to generate the additional argument required by process-2 and must return it. similar case for fun2to3
def fun1to2(*args):
return "additional_arg_for_process2_from_result1"
def fun2to3(*args):
return "additional_arg_for_process3_from_result2"
因此基于上述我们创建任务:
so based on the above we create the tasks:
tasks = [{"program": "python", "args": ["scripts/script1.py", "default_argument1"], "function": fun1to2},
{"program": "python", "args": ["scripts/script2.py", "default_argument2"], "function": fun2to3},
{"program": "python", "args": ["scripts/script3.py", "default_argument3"]}]
综合以上,最终实现如下:
Using all of the above, the final implementation is as follows:
import sys
from PyQt5 import QtCore, QtWidgets
from functools import partial
class TaskManager(QtCore.QObject):
started = QtCore.pyqtSignal()
finished = QtCore.pyqtSignal()
progressChanged = QtCore.pyqtSignal(int, QtCore.QByteArray)
def __init__(self, parent=None):
QtCore.QObject.__init__(self, parent)
self._process = QtCore.QProcess(self)
self._process.finished.connect(self.handleFinished)
self._progress = 0
self._currentTask = None
def start_tasks(self, tasks):
self._tasks = iter(tasks)
self.fetchNext()
self.started.emit()
self._progress = 0
def fetchNext(self, additional_args=None):
try:
self._currentTask = next(self._tasks)
except StopIteration:
return False
else:
program = self._currentTask.get("program")
args = self._currentTask.get("args")
if additional_args is not None:
args += additional_args
self._process.start(program, args)
return True
def processCurrentTask(self):
output = self._process.readAllStandardOutput()
self._progress += 1
fun = self._currentTask.get("function")
res = None
if fun:
res = fun(output)
self.progressChanged.emit(self._progress, output)
return res
def handleFinished(self):
args = self.processCurrentTask()
if not self.fetchNext(args):
self.finished.emit()
def fun1to2(args):
return "-additional_arg_for_process2_from_result1"
def fun2to3(args):
return "-additional_arg_for_process3_from_result2"
class gui(QtWidgets.QMainWindow):
def __init__(self):
super(gui, self).__init__()
self.initUI()
def dataReady(self, progress, result):
self.output.append(str(result, "utf-8"))
self.progressBar.setValue(progress)
def callProgram(self):
tasks = [{"program": "python", "args": ["scripts/script1.py", "default_argument1"], "function": fun1to2},
{"program": "python", "args": ["scripts/script2.py", "default_argument2"], "function": fun2to3},
{"program": "python", "args": ["scripts/script3.py", "default_argument3"]}]
self.progressBar.setMaximum(len(tasks))
self.manager.start_tasks(tasks)
def initUI(self):
layout = QtWidgets.QVBoxLayout()
self.runButton = QtWidgets.QPushButton('Run')
self.runButton.clicked.connect(self.callProgram)
self.output = QtWidgets.QTextEdit()
self.progressBar = QtWidgets.QProgressBar()
layout.addWidget(self.output)
layout.addWidget(self.runButton)
layout.addWidget(self.progressBar)
centralWidget = QtWidgets.QWidget()
centralWidget.setLayout(layout)
self.setCentralWidget(centralWidget)
self.manager = TaskManager(self)
self.manager.progressChanged.connect(self.dataReady)
self.manager.started.connect(partial(self.runButton.setEnabled, False))
self.manager.finished.connect(partial(self.runButton.setEnabled, True))
def main():
app = QtWidgets.QApplication(sys.argv)
ui=gui()
ui.show()
sys.exit(app.exec_())
if __name__ == '__main__':
main()
结果:
这篇关于如何在 PyQt5 中排队 QProcesses?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!