如何在 PyQt5 中排队 QProcesses? [英] How do I queue QProcesses in PyQt5?

查看:55
本文介绍了如何在 PyQt5 中排队 QProcesses?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在 PyQt5 中排队 QProcess 或者只是阻塞,同时仍然使用 readAll() 读取标准输出.相当于 subprocess.call 而不是 subprocess.Pop.使用 waitForFinished() 时,带有 readAll() 的 stdout 将在进程结束时立即出现,而不是在处理过程中流出.

I want to queue QProcess in PyQt5 or simply block while still reading the stdout with readAll(). The equivalent of subprocess.call instead of subprocess.Pop. When using waitForFinished() the stdout with readAll() will all come at once when the process is ended instead of flowing out while it's processing.

示例脚本:

from PIL import Image
import numpy as np
import sys
from PyQt5 import QtGui,QtCore, QtWidgets

class gui(QtWidgets.QMainWindow):
    def __init__(self):
        super(gui, self).__init__()
        self.initUI()

    def dataReady(self):
        cursor = self.output.textCursor()
        cursor.movePosition(cursor.End)
        cursor.insertText(str(self.process.readAll(), "utf-8"))
        self.output.ensureCursorVisible()


    def callProgram(self):
        # run the process
        # `start` takes the exec and a list of argument

        filepath = 'path\image.tif'

        self.process.start('some_command filepath'])

        # This will output a file image.tif specified by filepath: 

        # Import file and do stuff to it:

        # E.g.

        im = Image.open('filepath')

        imarray = np.array(im)

        # Get image extents as argument to next process:

        ext = str(imarray.size)


        imarray[imarray == 10] = 5

        # Save changes

        im = Image.fromarray(imarray)
        im.save(filepath)            

        # Now the image has been updated and should be in a new process below

        cmd = 'some_other_command' + filepath + ext

        self.process.start(cmd)

        # Same thing goes on here:

        self.process.start('some_command filepath')

        # Import file once again

        im = Image.open('filepath')

        imarray[imarray == 10] = 5

        # Save changes

        im = Image.fromarray(imarray)
        im.save(filepath)    

    def initUI(self):

        layout = QtWidgets.QHBoxLayout()
        self.runButton = QtWidgets.QPushButton('Run')
        self.runButton.clicked.connect(self.callProgram)

        self.output = QtWidgets.QTextEdit()

        layout.addWidget(self.output)
        layout.addWidget(self.runButton)

        centralWidget = QtWidgets.QWidget()
        centralWidget.setLayout(layout)
        self.setCentralWidget(centralWidget)

        # QProcess object for external app
        self.process = QtCore.QProcess(self)
        # QProcess emits `readyRead` when there is data to be read
        self.process.readyRead.connect(self.dataReady)

        # Just to prevent accidentally running multiple times
        # Disable the button when process starts, and enable it when it finishes
        self.process.started.connect(lambda: self.runButton.setEnabled(False))
        self.process.finished.connect(lambda: self.runButton.setEnabled(True))


#Function Main Start
def main():
    app = QtWidgets.QApplication(sys.argv)
    ui=gui()
    ui.show()
    sys.exit(app.exec_())
#Function Main END

if __name__ == '__main__':
    main()

推荐答案

这种情况下的解决方案是创建一个 TaskManager 类,负责处理任务之间的顺序性.

The solution in this case is to create a TaskManager class that is responsible for handling the sequentiality between the tasks.

import sys

from PyQt5 import QtCore, QtWidgets
from functools import partial

class TaskManager(QtCore.QObject):
    started = QtCore.pyqtSignal()
    finished = QtCore.pyqtSignal()
    progressChanged = QtCore.pyqtSignal(int, QtCore.QByteArray)

    def __init__(self, parent=None):
        QtCore.QObject.__init__(self, parent)
        self._process = QtCore.QProcess(self)
        self._process.finished.connect(self.handleFinished)
        self._progress = 0

    def start_tasks(self, tasks):
        self._tasks = iter(tasks)
        self.fetchNext()
        self.started.emit()
        self._progress = 0

    def fetchNext(self):
            try:
                task = next(self._tasks)
            except StopIteration:
                return False
            else:
                self._process.start(*task)
            return True

    def processCurrentTask(self):
        output = self._process.readAllStandardOutput()
        self._progress += 1
        self.progressChanged.emit(self._progress, output)

    def handleFinished(self):
        self.processCurrentTask()
        if not self.fetchNext():
            self.finished.emit()



class gui(QtWidgets.QMainWindow):
    def __init__(self):
        super(gui, self).__init__()
        self.initUI()


    def dataReady(self, progress, result):
        self.output.append(str(result, "utf-8"))
        self.progressBar.setValue(progress)

    def callProgram(self):
        tasks = [("ping", ["8.8.8.8"]),
                 ("ping", ["8.8.8.8"]),
                 ("ping", ["8.8.8.8"])]

        self.progressBar.setMaximum(len(tasks))
        self.manager.start_tasks(tasks)

    def initUI(self):
        layout = QtWidgets.QVBoxLayout()
        self.runButton = QtWidgets.QPushButton('Run')

        self.runButton.clicked.connect(self.callProgram)

        self.output = QtWidgets.QTextEdit()

        self.progressBar = QtWidgets.QProgressBar()

        layout.addWidget(self.output)
        layout.addWidget(self.runButton)
        layout.addWidget(self.progressBar)

        centralWidget = QtWidgets.QWidget()
        centralWidget.setLayout(layout)
        self.setCentralWidget(centralWidget)

        self.manager = TaskManager(self)
        self.manager.progressChanged.connect(self.dataReady)
        self.manager.started.connect(partial(self.runButton.setEnabled, False))
        self.manager.finished.connect(partial(self.runButton.setEnabled, True))


def main():
    app = QtWidgets.QApplication(sys.argv)
    ui=gui()
    ui.show()
    sys.exit(app.exec_())

if __name__ == '__main__':
    main()

<小时>

更新:

概括这个问题,可以说第n个进程需要默认参数和附加参数.

Generalizing the problem, it can be said that the n-th process needs default arguments and additional arguments.

  • 默认参数是独立且固定的

  • The default arguments are independent and fixed

附加参数通过某个函数依赖于前面的过程.

The additional arguments depend on the previous process through a certain function.

因此可以使用以下表达式进行概括:

So it can be generalized using the following expressions:

result_n = process_n(default_arguments, additional_args_n)`
additional_args_n = fun_n(result_n-1)`

或使用下图:

 ________      _________      ________      _________      ________
|        |    |         |    |        |    |         |    |        |
|        |    |         |    |        |    |         |    |        |
| TASK-1 |--->| FUN1TO2 |--->| TASK-2 |--->| FUN2TO3 |--->| TASK-3 |
|        |    |         |    |        |    |         |    |        |
|________|    |_________|    |________|    |_________|    |________|

然后为了构建过程,创建了以下字典:

Then to structure the process the following dictionary is created:

task_n = {"program": program, "args": default_arguments, "function": fun}

其中 fun 是用于处理此任务的输出以获得下一个任务的附加参数的函数.

Where fun is the function used to process the output of this task to obtain additional arguments for the next task.

在下面的示例中,我将使用 scriptX.py 作为程序而不是 ping.

In the following example, I will use scriptX.py as a program instead of ping.

#script1.py
import sys

def foo(*args):
    v,  = args
    return "1-"+"".join(v)

arg = sys.argv[1:]
print(foo(arg))

#script2.py
import sys

def foo(*args):
    v,  = args
    return "2-"+"".join(v)

arg = sys.argv[1:]
print(foo(arg))

#script3.py
import sys

def foo(*args):
    v,  = args
    return "3-"+"".join(v)

arg = sys.argv[1:]
print(foo(arg))

fun1to2 是使用 process-1 的结果生成 process-2 所需的附加参数并且必须返回它的函数.fun2to3

fun1to2 is the function that uses the result of process-1 to generate the additional argument required by process-2 and must return it. similar case for fun2to3

def fun1to2(*args):
    return "additional_arg_for_process2_from_result1" 

def fun2to3(*args):
    return "additional_arg_for_process3_from_result2" 

因此基于上述我们创建任务:

so based on the above we create the tasks:

tasks = [{"program": "python", "args": ["scripts/script1.py", "default_argument1"], "function": fun1to2},
         {"program": "python", "args": ["scripts/script2.py", "default_argument2"], "function": fun2to3},
         {"program": "python", "args": ["scripts/script3.py", "default_argument3"]}]

综合以上,最终实现如下:

Using all of the above, the final implementation is as follows:

import sys

from PyQt5 import QtCore, QtWidgets
from functools import partial

class TaskManager(QtCore.QObject):
    started = QtCore.pyqtSignal()
    finished = QtCore.pyqtSignal()
    progressChanged = QtCore.pyqtSignal(int, QtCore.QByteArray)

    def __init__(self, parent=None):
        QtCore.QObject.__init__(self, parent)
        self._process = QtCore.QProcess(self)
        self._process.finished.connect(self.handleFinished)
        self._progress = 0
        self._currentTask = None

    def start_tasks(self, tasks):
        self._tasks = iter(tasks)
        self.fetchNext()
        self.started.emit()
        self._progress = 0

    def fetchNext(self, additional_args=None):
            try:
                self._currentTask = next(self._tasks)
            except StopIteration:
                return False
            else:
                program = self._currentTask.get("program")
                args = self._currentTask.get("args")
                if additional_args is not None:
                    args += additional_args
                self._process.start(program, args)
            return True

    def processCurrentTask(self):
        output = self._process.readAllStandardOutput()
        self._progress += 1
        fun = self._currentTask.get("function")
        res = None
        if fun:
            res = fun(output)
        self.progressChanged.emit(self._progress, output)
        return res

    def handleFinished(self):
        args = self.processCurrentTask()
        if not self.fetchNext(args):
            self.finished.emit()


def fun1to2(args):
    return "-additional_arg_for_process2_from_result1" 

def fun2to3(args):
    return "-additional_arg_for_process3_from_result2" 

class gui(QtWidgets.QMainWindow):
    def __init__(self):
        super(gui, self).__init__()
        self.initUI()


    def dataReady(self, progress, result):
        self.output.append(str(result, "utf-8"))
        self.progressBar.setValue(progress)


    def callProgram(self):
        tasks = [{"program": "python", "args": ["scripts/script1.py", "default_argument1"], "function": fun1to2},
                 {"program": "python", "args": ["scripts/script2.py", "default_argument2"], "function": fun2to3},
                 {"program": "python", "args": ["scripts/script3.py", "default_argument3"]}]

        self.progressBar.setMaximum(len(tasks))
        self.manager.start_tasks(tasks)

    def initUI(self):
        layout = QtWidgets.QVBoxLayout()
        self.runButton = QtWidgets.QPushButton('Run')

        self.runButton.clicked.connect(self.callProgram)

        self.output = QtWidgets.QTextEdit()

        self.progressBar = QtWidgets.QProgressBar()

        layout.addWidget(self.output)
        layout.addWidget(self.runButton)
        layout.addWidget(self.progressBar)

        centralWidget = QtWidgets.QWidget()
        centralWidget.setLayout(layout)
        self.setCentralWidget(centralWidget)

        self.manager = TaskManager(self)
        self.manager.progressChanged.connect(self.dataReady)
        self.manager.started.connect(partial(self.runButton.setEnabled, False))
        self.manager.finished.connect(partial(self.runButton.setEnabled, True))


def main():
    app = QtWidgets.QApplication(sys.argv)
    ui=gui()
    ui.show()
    sys.exit(app.exec_())

if __name__ == '__main__':
    main()

结果:

这篇关于如何在 PyQt5 中排队 QProcesses?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆