线程可以处理很长的I/O进程吗 [英] Can very long I/O processes be handled by threads

查看:70
本文介绍了线程可以处理很长的I/O进程吗的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从这里开始一个新主题,该主题将与

I'm starting here a new topic that will be linked with this question.

我邀请您只是在背景上阅读以获得全局想法.

I invite you to just read on the background in order to get the global idea.

因此,我有一个下载功能,该功能依赖于python 3.2 API(由一家私人公司开发).每个文件最多可能需要400秒的时间.

So I’ve a download function which relies on a python 3.2 API (developed by a private company). The process can take up to 400 seconds per file.

很显然,我没有一个文件可以下载,因此我已经尝试了好几天将每个下载过程都放在一个线程池中.池中的每个线程应完全独立于GUI主线程.当其中一个完成时,它应该只向GUI发送信号.

Obviously, I don't have only one file to download, so I've been trying for days to put every download process in a thread pool. Each thread in the pool should be completely autonomous from the GUI Main Thread. When one of them has finished, it should just send a signal to the GUI.

我做了几次测试,但是使用了什么技术,但是

I did several tests but whatever the technique used, but

  1. GUI冻结;
  2. 仅在所有线程处理结束时给出结果,而不是(按需要)一个接一个地给出结果.

我认为API提供的下载方法是一个无法线程化的阻止功能.

I think that the download method given by the API is a blocking function that can't be threaded.

所以我的问题很简单:如何知道I/O方法是否可以通过线程处理.

So my question is simple: how to know if a I/O method can be handled through a thread.

2017年11月24日更新

November 24,2017 Update

在下面,您会找到部分满足我的期望的初稿(带有串联multiprocessing.pool/map_async).如您所见,很遗憾,我不得不插入一个忙等待循环",以便在QPlainTextEdit上获得有关正在发生的事情的一些信息.

You will find below a first draft (with the tandem multiprocessing.pool / map_async) that partially meets my expectations. As you will see, I unfortunately had to insert a "Busy Waiting Loop" in order to get on the QPlainTextEdit some information on what was going on.

仅在全局处理结束时才给出任务的结果(行为map_async).那不是我要找的东西.我想插入更多实时信息,并在控制台上立即看到每个已完成任务的消息.

The results of the tasks are given only at the end of the global processing (behaviour map_async). That's not exactly what I'm looking for. I would like to insert a little more real time and see for each completed task its message immediately on the console.

import time
import multiprocessing
import private.library as bathy
from PyQt4 import QtCore, QtGui
import os
import sys

user = 'user'
password = 'password'
server = 'server'
basename = 'basename'

workers = multiprocessing.cpu_count()

node = bathy.NodeManager(user, password, server)
database = node.get_database(basename)

ids = (10547, 3071, 13845, 13846, 13851, 13844, 5639, 4612, 4613, 954,
       961, 962, 4619, 4620, 4622, 4623, 4624, 4627, 4628, 4631,
       4632, 4634, 4635, 4638, 4639, 4640, 4641, 4642, 10722, 1300,
       1301, 1303, 1310, 1319, 1316, 1318, 1321, 1322, 1323, 1324,
       1325, 1347, 1348, 1013, 1015, 1320, 8285, 8286, 8287, 10329,
       9239, 9039, 5006, 5009, 5011, 5012, 5013, 5014, 5015, 5025,
       5026, 4998, 5040, 5041, 5042, 5043, 11811, 2463, 2464, 5045,
       5046, 5047, 5048, 5049, 5053, 5060, 5064, 5065, 5068, 5069,
       5071, 5072, 5075, 5076, 5077, 5079, 5080, 5081, 5082, 5083,
       5084, 5085, 5086, 5087, 5088, 5090, 5091, 5092, 5093)


# ---------------------------------------------------------------------------------
def download(surface_id, index):
    global node
    global database

    t = time.time()
    message = 'Surface #%d - Process started\n' % index

    surface = database.get_surface(surface_id)
    metadata = surface.get_metadata()
    file_path = os.path.join("C:\\Users\\philippe\\Test_Download",
                             metadata["OBJNAM"] + ".surf")

    try:
        surface.download_bathymetry(file_path)
    except RuntimeError as error:
        message += "Error : " + str(error).split('\n')[0] + '\n'
    finally:
        message += ('Process ended : %.2f s\n' % (time.time() - t))

    return message


# ---------------------------------------------------------------------------------
 def pass_args(args):
    # Method to pass multiple arguments to download (multiprocessing.Pool)
    return download(*args)


# ---------------------------------------------------------------------------------
class Console(QtGui.QDialog):
    def __init__(self):
        super(self.__class__, self).__init__()

        self.resize(600, 300)
        self.setMinimumSize(QtCore.QSize(600, 300))
        self.setWindowTitle("Console")
        self.setModal(True)

        self.verticalLayout = QtGui.QVBoxLayout(self)

        # Text edit
        # -------------------------------------------------------------------------

        self.text_edit = QtGui.QPlainTextEdit(self)
        self.text_edit.setReadOnly(True)
        self.text_edit_cursor = QtGui.QTextCursor(self.text_edit.document())
        self.verticalLayout.addWidget(self.text_edit)

        # Ok / Close
        # -------------------------------------------------------------------------
        self.button_box = QtGui.QDialogButtonBox(self)
        self.button_box.setStandardButtons(QtGui.QDialogButtonBox.Close | 
                                           QtGui.QDialogButtonBox.Ok)
        self.button_box.setObjectName("button_box")
        self.verticalLayout.addWidget(self.button_box)

        # Connect definition
        # -------------------------------------------------------------------------

        self.connect(self.button_box.button(QtGui.QDialogButtonBox.Close),
                     QtCore.SIGNAL('clicked()'),
                     self.button_cancel_clicked)
        self.connect(self.button_box.button(QtGui.QDialogButtonBox.Ok),
                     QtCore.SIGNAL('clicked()'),
                     self.button_ok_clicked)

        # Post initialization
        # -------------------------------------------------------------------------
        self.pool = multiprocessing.Pool(processes=workers)

    # Connect functions
    # -----------------------------------------------------------------------------
    def button_cancel_clicked(self):
        self.close()

    def button_ok_clicked(self):
        jobs_args = [(surface_id, index) for index, surface_id in enumerate(ids)]
        async = pool.map_async(pass_args, jobs_args)
        pool.close()

        # Busy waiting loop
        while True:
            # pool.map_async has a _number_left attribute, and a ready() method
            if async.ready():
                self.write_stream("All tasks completed\n")
                pool.join()
                for line in async.get():
                    self.write_stream(line)
                break

            remaining = async._number_left
            self.write_stream("Waiting for %d task(s) to complete...\n" % remaining)
            time.sleep(0.5)


    # Other functions
    # -----------------------------------------------------------------------------
    def write_stream(self, text):
        self.text_edit.insertPlainText(text)
        cursor = self.text_edit.textCursor()
        self.text_edit.setTextCursor(cursor)
        app.processEvents()


# ---------------------------------------------------------------------------------
if __name__ == '__main__':
    app = QtGui.QApplication(sys.argv)
    window = Console()
    window.show()
    app.exec_()

问题

  1. 乍看之下,以上代码为您带来了概念上的错误吗?
  2. 在这种特定情况下,我是否必须使用apply_async方法来获得更具交互性的内容?
  3. 您能指导我如何使用回调函数发布自定义事件以更新控制台(@ekhumoro建议的方法)吗?


2017年11月25日更新


November 25,2017 Update

我尝试了apply_async:

I had a try with apply_async:

def button_ok_clicked(self):
    # Pool.apply_async - the call returns immediately instead of 
    # waiting for the result
    for index, surface_id in enumerate(ids):
        async = pool.apply_async(download, 
                                 args=(surface_id, index),
                                 callback=self.write_stream)
    pool.close()

带有回调:

def write_stream(self, text):
    # This is called whenever pool.apply_async(i) returns a result
    self.text_edit.insertPlainText(text)
    cursor = self.text_edit.textCursor()
    self.text_edit.setTextCursor(cursor)
    # Update the text edit
    app.processEvents()

不幸的是,这样做会使应用程序崩溃.我认为我必须设置一种锁定机制,以防止所有任务同时写入文本编辑中.

Unfortunately, by doing this way the application crashes. I think I'll have to put a lock mechanism to prevent all the tasks from writing in the text edit at the same time.

推荐答案

下面是示例脚本的简化版本,其中显示了如何使用回调发布自定义事件.每个作业都通过apply_async分别处理,因此将更新一个简单的计数器,以指示所有作业何时完成.

Below is a simplified version of your example script that shows how to post a custom event using a callback. Each job is processed separately via apply_async, so a simple counter is updated to indicate when all the jobs have been completed.

import sys, time, random, multiprocessing
from PyQt4 import QtCore, QtGui

ids = (10547, 3071, 13845, 13846, 13851, 13844, 5639, 4612, 4613, 954,
       961, 962, 4619, 4620, 4622, 4623, 4624, 4627, 4628, 4631,
       4632, 4634, 4635, 4638, 4639, 4640, 4641, 4642, 10722, 1300,
       1301, 1303, 1310, 1319, 1316, 1318, 1321, 1322, 1323, 1324,
       1325, 1347, 1348, 1013, 1015, 1320, 8285, 8286, 8287, 10329,
       9239, 9039, 5006, 5009, 5011, 5012, 5013, 5014, 5015, 5025,
       5026, 4998, 5040, 5041, 5042, 5043, 11811, 2463, 2464, 5045,
       5046, 5047, 5048, 5049, 5053, 5060, 5064, 5065, 5068, 5069,
       5071, 5072, 5075, 5076, 5077, 5079, 5080, 5081, 5082, 5083,
       5084, 5085, 5086, 5087, 5088, 5090, 5091, 5092, 5093)

def download(surface_id, index):
    t = time.time()
    message = 'Surface #%s (%s) - Process started\n' % (index, surface_id)
    time.sleep(random.random())
    message += 'Process ended : %.2f s\n' % (time.time() - t)
    return message

def pass_args(args):
    return download(*args)

class CustomEvent(QtCore.QEvent):
    DownloadComplete = QtCore.QEvent.registerEventType()

    def __init__(self, typeid, *args):
        super().__init__(typeid)
        self.data = args

class Console(QtGui.QDialog):
    def __init__(self):
        super().__init__()
        self.resize(600, 300)
        self.setMinimumSize(QtCore.QSize(600, 300))
        self.setWindowTitle("Console")
        self.verticalLayout = QtGui.QVBoxLayout(self)
        self.text_edit = QtGui.QPlainTextEdit(self)
        self.text_edit.setReadOnly(True)
        self.text_edit_cursor = QtGui.QTextCursor(self.text_edit.document())
        self.verticalLayout.addWidget(self.text_edit)
        self.button_box = QtGui.QDialogButtonBox(self)
        self.button_box.setStandardButtons(
            QtGui.QDialogButtonBox.Close | QtGui.QDialogButtonBox.Ok)
        self.button_box.setObjectName("button_box")
        self.verticalLayout.addWidget(self.button_box)
        self.button_box.button(QtGui.QDialogButtonBox.Close
            ).clicked.connect(self.button_cancel_clicked)
        self.button_box.button(QtGui.QDialogButtonBox.Ok
            ).clicked.connect(self.button_ok_clicked)
        self.pool = multiprocessing.Pool(None)

    def event(self, event):
        if event.type() == CustomEvent.DownloadComplete:
            message, complete = event.data
            self.write_stream(message)
            if complete:
                self.write_stream('Downloads complete!')
        return super().event(event)

    def button_cancel_clicked(self):
        self.close()

    def button_ok_clicked(self):
        total = len(ids)
        def callback(message):
            nonlocal total
            total -= 1
            QtGui.qApp.postEvent(self, CustomEvent(
                CustomEvent.DownloadComplete, message, not total))
        for index, surface_id in enumerate(ids):
            self.pool.apply_async(
                pass_args, [(surface_id, index)], callback=callback)

    def write_stream(self, text):
        self.text_edit.insertPlainText(text)
        cursor = self.text_edit.textCursor()
        self.text_edit.setTextCursor(cursor)

if __name__ == '__main__':

    app = QtGui.QApplication(sys.argv)
    window = Console()
    window.show()
    app.exec_()

这篇关于线程可以处理很长的I/O进程吗的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆