将多线程和多处理与并发结合起来 [英] Combining multithreading and multiprocessing with concurrent.futures

查看:94
本文介绍了将多线程和多处理与并发结合起来的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个高度依赖I / O且占用大量CPU的功能。我试图通过多处理和多线程来并行化它,但是它被卡住了。之前曾有人问过这个问题 ,但是使用的是其他设置。我的函数是完全独立的,什么也不返回。为什么会卡住?

I have a function which is both highly I/O dependent and CPU-intensive. I tried to parallelize it by multiprocessing and multithreading but it is stuck. This question was asked before but in a different setting. My function is fully independent and returns nothing. Why is it stuck? How can it be fixed?

import concurrent.futures
import os
import numpy as np
import time


ids = [1,2,3,4,5,6,7,8]

def f(x):
    time.sleep(1)
    x**2

def multithread_accounts(AccountNumbers, f, n_threads = 2):

    slices = np.array_split(AccountNumbers, n_threads)
    slices = [list(i) for i in slices]

    with concurrent.futures.ThreadPoolExecutor() as executor:
        executor.map(f, slices)



def parallelize_distribute(AccountNumbers, f, n_threads = 2, n_processors = os.cpu_count()):

    slices = np.array_split(AccountNumbers, n_processors)
    slices = [list(i) for i in slices]

    with concurrent.futures.ProcessPoolExecutor(max_workers=n_processors) as executor:
        executor.map( lambda x: multithread_accounts(x, f, n_threads = n_threads) , slices)
        
parallelize_distribute(ids, f, n_processors=2, n_threads=2)


推荐答案

对不起,但我不能花时间来解释所有这些,所以我只给代码有效。我敦促您从简单的事情入手,因为学习过程并不轻松。首先将numpy排除在外;首先坚持线程;然后转到 only 进程;并且除非您是专家,否则不要尝试并行化除命名模块级函数以外的任何东西(不,不是函数本地的匿名lambda)。

Sorry, but I can't make time to explain all this, so I'll just give code "that works". I urge you to start with something simpler, because the learning curve is non-trivial. Leave numpy out of it at first; stick to only threads at first; then move to only processes; and unless you're an expert don't try to parallelize anything other than named module-level functions (no, not function-local anonymous lambdas).

经常会发生错误消息您应该由于它们是异步发生的,因此被抑制了,因此没有报告它们的好方法。自由地添加 print()语句以查看您要走多远。

As often happens, the error messages you "should be" getting are being suppressed because they occur asynchronously so there's no good way to report them. Liberally add print() statements to see how far you're getting.

注意:我从这里剥离了numpy并添加了需要的东西,以便它也可以在Windows上运行。我希望使用numpy的 array_split()可以正常工作,但是我当时使用的机器上没有numpy方便。

Note: I stripped numpy out of this, and added the stuff needed so it runs on Windows too. I expect using numpy's array_split() instead would work fine, but I didn't have numpy handy on the machine I was on at the time.

import concurrent.futures as cf
import os
import time

def array_split(xs, n):
    from itertools import islice
    it = iter(xs)
    result = []
    q, r = divmod(len(xs), n)
    for i in range(r):
        result.append(list(islice(it, q+1)))
    for i in range(n - r):
        result.append(list(islice(it, q)))
    return result
    
ids = range(1, 11)

def f(x):
    print(f"called with {x}")
    time.sleep(5)
    x**2

def multithread_accounts(AccountNumbers, f, n_threads=2):
    with cf.ThreadPoolExecutor(max_workers=n_threads) as executor:
        for slice in array_split(AccountNumbers, n_threads):
            executor.map(f, slice)

def parallelize_distribute(AccountNumbers, f, n_threads=2, n_processors=os.cpu_count()):
    slices = array_split(AccountNumbers, n_processors)
    print("top slices", slices)
    with cf.ProcessPoolExecutor(max_workers=n_processors) as executor:
        executor.map(multithread_accounts, slices,
                                           [f] * len(slices),
                                           [n_threads] * len(slices))

if __name__ == "__main__":
    parallelize_distribute(ids, f, n_processors=2, n_threads=2)

顺便说一句,我建议这对于螺纹部分更有意义:

BTW, I suggest this makes more sense for the threaded part:

def multithread_accounts(AccountNumbers, f, n_threads=2):
    with cf.ThreadPoolExecutor(max_workers=n_threads) as executor:
        executor.map(f, AccountNumbers)

也就是说,实际上不需要在这里自己拆分列表-线程处理机制会自行拆分它。您可能会在最初的尝试中错过了它,因为您发布的代码中的 ThreadPoolExecutor()调用忘记了指定 max_workers 参数。

That is, there's really no need to split the list yourself here - the threading machinery will split it up itself. It's possible you missed that in your original attempts, because the ThreadPoolExecutor() call in the code you posted forgot to specify the max_workers argument.

这篇关于将多线程和多处理与并发结合起来的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆