如果从QThread启动,Joblib Parallel仅使用一个内核 [英] Joblib Parallel uses only one core if started from QThread

查看:88
本文介绍了如果从QThread启动,Joblib Parallel仅使用一个内核的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个进行大量运算的GUI.为了加快速度,我将joblib的Parallel执行与pyqt的QThreads一起使用,以避免GUI变得无响应.到目前为止,并行执行工作正常,但是如果嵌入在GUI中并在其自己的线程中运行,则它仅利用我的4个内核之一.我在线程/多处理世界中错过了任何基本知识吗?

I'm developing a GUI which carries out some heavy number crunching. To speed up things I use joblib's Parallel execution together with pyqt's QThreads to avoid the GUI from becoming unresponsive. The Parallel execution works fine so far, but if embedded in the GUI and run in its own thread it utilizes only one of my 4 cores. Anything fundamental I missed in the threading/multiprocessing world?

这是我的设置的粗略草图:

Here a rough sketch of my setup:

 class ThreadRunner(QtCore.QObject):

    start = QtCore.pyqtSignal()
    result_finished = QtCore.pyqtSignal(np.ndarray)

    def __init__(self, function, *args, **kwargs):
        super(DispMapRunner, self).__init__()

        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.start.connect(self.run)

    def run(self):
        print "New Thread started"
        result = self.function(*self.args, **self.kwargs)
        self.result_finished.emit(result)

class Gui(QtGui.QMainWindow, form_class):
    def __init__(self, cl_args, parent=None):
        super(Gui, self).__init__()
        #other stuff

    def start_thread(self, fun, *args, **kwargs):
        self.runner = ThreadRunner(fun, *args, **kwargs)
        self.thread = QtCore.QThread() 
        self.runner.moveToThread(self.thread)
        # more stuff for catching results

def slice_producer(data):
    n_images, rows, cols = data.shape[:3]
    for r in range(rows):
        yield np.copy(data[:,r,...])

    def run_parallel(data, *args, **kwargs):
        results = joblib.Parallel(
                    n_jobs=4,
                    verbose=12,
                    pre_dispatch='1.5*n_jobs'
                    )
                    (
                    delayed(
                    memory.cache(do_long_computation))
                    (slice, **kwargs) for slice in slice_producer(data)
                    )   

我希望它不会太长,同时又太模糊.我使用pyqt4 4.11.3和joblib 0.8.4.

I hope it is not too long and at the same time too vague. I use pyqt4 4.11.3 and joblib 0.8.4.

我再次检查我的代码并注意到以下警告:

I checked my code again and noticed the following warning:

UserWarning: Multiprocessing backed parallel loops cannot 
be nested below threads, setting n_jobs=1

哪个问题将我的问题提炼为以下内容:如何在单独的线程中运行多处理进程?

Which refines my question to the following: How to run a multiprocessing process in a seperate thread?

推荐答案

好吧,多亏了ekhumoro,我找到了一种可行的方法,该方法仅在mp.pool实例上使用,并且可以与回调一起使用.唯一的缺点是子进程中的错误会以静默方式失败(例如,将结果更改为f_wrapper).这里的代码供以后参考:

Okay, thanks to ekhumoro I arrived at something which works, uses only on instance of mp.pool and works with callbacks. Only drawback is, that errors in the child process fail silently (e.g. change results to result in f_wrapper). Here the code for future reference:

from PyQt4.QtCore import *
from PyQt4.QtGui import *
import multiprocessing
import sys
import numpy as np
import time

def f(data_slice, **kwargs):
    '''This is a time-intensive function, which we do not want to alter
    '''
    data = 0
    for row in range(data_slice.shape[0]):
        for col in range(data_slice.shape[1]):
            data += data_slice[row,col]**2
    time.sleep(0.1)
    return data, 3, 5, 3 # some dummy calculation results


def f_wrapper(row, data_slice,  **kwargs):
    results = f(data_slice, **kwargs)
    return row, results

class MainWindow(QMainWindow): #You can only add menus to QMainWindows

    def __init__(self):
        super(MainWindow, self).__init__()
        self.pool = multiprocessing.Pool(processes=4)

        button1 = QPushButton('Connect', self)
        button1.clicked.connect(self.apply_connection)
        self.text = QTextEdit()

        vbox1 = QVBoxLayout()
        vbox1.addWidget(button1)
        vbox1.addWidget(self.text)
        myframe = QFrame()
        myframe.setLayout(vbox1)

        self.setCentralWidget(myframe)
        self.show() #display and activate focus
        self.raise_()


    def apply_connection(self):
        self.rows_processed = list()
        self.max_size = 1000
        data = np.random.random(size = (100, self.max_size,self.max_size))
        kwargs = {'some_kwarg' : 1000}
        for row in range(data.shape[1]):
            slice = data[:,row, :]
            print "starting f for row ", row 
            result = self.pool.apply_async(f_wrapper, 
                                           args = (row, slice), 
                                           kwds = kwargs,
                                           callback=self.update_gui)
            #~ result.get() # blocks gui, but raises errors for debugging


    def update_gui(self, result):
        row, func_result = result
        self.rows_processed.append(row)
        print len(self.rows_processed)
        print func_result# or do something more intelligent
        self.text.append('Applied connection. Row = %d\n' % row)
        if len(self.rows_processed) == self.max_size:
            print "Done!" 




if __name__ == '__main__':
    app = QApplication(sys.argv)
    gui = MainWindow()
    app.exec_()

如果有捕获错误的好方法,那将是一个不错的奖励.

If there is a nice way to capture errors, that would be a nice bonus.

这篇关于如果从QThread启动,Joblib Parallel仅使用一个内核的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆