Python 多处理(将数据拆分成更小的块 - 多个函数参数) [英] Python Multiprocessing (Splitting data in smaller chunks - multiple function arguments)

查看:73
本文介绍了Python 多处理(将数据拆分成更小的块 - 多个函数参数)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

来自 22.02.21 的注释:- 潜在地,我的问题也可以通过更有效的内存使用而不是多处理来解决,因为我意识到内存负载变得非常高并且可能是这里的限制因素.

Note from 22.02.21: -Potentially my problem could also be solved by a more efficient memory usage instead of multiprocessing, since I realized that the memory load gets very high and might be a limiting factor here.

我正在尝试通过使用多处理来减少我的脚本运行所需的时间.在过去,我得到了一些关于提高函数本身速度的好技巧(提高np.where() 循环),但现在我想利用32 核工作站的所有核心.我的函数将两个列表(X 和 Y)的条目与引用列表 Q 和 Z 进行比较.对于 X/Y 中的每个元素,它检查 X[i] 是否出现在 Q 中的某处以及 Y[i] 是否出现在 Z 中.如果X[i] == Q[s] AND Y[i] == Z[s],它返回索引s".(注意:我的真实数据由 DNA 测序读取组成,我需要将读取映射到参考.)

I'm trying to reduce the time that my script needs to run by making use of multiprocessing. In the past I got some good tips about increasing the speed of the function itself (Increase performance of np.where() loop), but now I would like to make use of all cores of a 32-core workstation. My function compares entries of two lists (X and Y) with a reference lists Q and Z. For every element in X/Y, it checks whether X[i] occurs somewhere in Q and whether Y[i] occurs in Z. If X[i] == Q[s] AND Y[i] == Z[s], it returns the index "s". (Note: My real data consists of DNA sequencing reads and I need to map my reads to the reference.)

到目前为止我尝试过的:

What I tried so far:

  • 我的长列表 X 和 Y 拆分为偶数块(n 块,其中 n == cpu_count)
  • 尝试concurrent.futures.ProcessPoolExecutor()"为每个子列表"运行函数并行并最终将每个过程的结果合并到一个最终字典(matchdict)中.(--> 见注释掉部分)
  • Splitting my long lists X and Y into even chunks (n-chunks, where n == cpu_count)
  • Trying the "concurrent.futures.ProcessPoolExecutor()" to run the function for each "sublist" in parallel and in the end combine the result of each process to one final dictionary (matchdict). (--> see commented out section)

我的问题:

  • 当我取消对多处理部分的注释时,所有内核都得到了使用,但最终出现了一个我还无法解决的错误(索引超出范围).(--> 提示:将 N 降低到 1000,您将立即看到错误,而无需永远等待)

有谁知道如何解决这个问题,或者可以建议在我的代码中使用多处理的更好方法?

Does anyone know how to solve this, or can suggest a better approach to use multiprocessing in my code?

代码如下:

import numpy as np
import multiprocessing
import concurrent.futures

np.random.seed(1)

def matchdictfunc(index,x,y,q,z):  # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
    lookup = {}
    for i, (q, z) in enumerate(zip(Q, Z)):
        lookup.setdefault((q, z), []).append(i)

    matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
    matchdict = {}
    for ind, match in enumerate(matchlist):
        matchdict[index[ind]] = match
    
    return matchdict

def split(a, n):  # function to split list in n even parts
    k, m = divmod(len(a), n)
    return list((a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)))

def splitinput(index,X,Y,Q,Z):  # split large lists X and Y in n-even parts (n = cpu_count), make new list containing n-times Q and Z (to feed Q and Z for every process)
    cpu_count = multiprocessing.cpu_count()

    #create multiple chunks for X and Y and index:
    index_split = split(index,cpu_count)
    X_split = split(X,cpu_count)
    Y_split = split(Y,cpu_count)

    # create list with several times Q and Z since it needs to be same length as X_split etc:
    Q_mult = []  
    Z_mult = []
    for _ in range(cpu_count):
        Q_mult.append(Q)
        Z_mult.append(Z)
    return index_split,X_split,Y_split,Q_mult,Z_mult

# N will finally scale up to 10^9
N = 10000000
M = 300

index = [str(x) for x in list(range(N))]
X = np.random.randint(M, size=N)
Y = np.random.randint(M, size=N)

# Q and Z size is fixed at 120000
Q = np.random.randint(M, size=120000)
Z = np.random.randint(M, size=120000)

# convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
X = np.char.mod('%d', X).tolist()
Y = np.char.mod('%d', Y).tolist()
Q = np.char.mod('%d', Q).tolist()
Z = np.char.mod('%d', Z).tolist()

# single-core:
matchdict = matchdictfunc(index,X,Y,Q,Z)

# split lists to number of processors (cpu_count)
index_split,X_split,Y_split,Q_mult,Z_mult = splitinput(index,X,Y,Q,Z)  

## Multiprocessing attempt - FAILS! (index out of range)
# finallist = []
# if __name__ == '__main__':
#     with concurrent.futures.ProcessPoolExecutor() as executor:
#         results = executor.map(matchlistfunc,X_split,Y_split,Q_mult,Z_mult)
#         for result in results:
#             finallist.append(result)
    
#         matchdict = {}
#         for d in finallist:
#             matchdict.update(d)

推荐答案

你的函数 matchdictfunc 当前有参数 x, y, >q, z 但实际上并不使用它们,尽管在多处理版本中它需要使用两个参数.也不需要函数 splitinputQZ 复制为返回值 Q_splitZ_split.目前,matchdictfunc 期望 QZ 是全局变量,我们可以通过使用构造池时的 initializerinitargs 参数.您还应该将不需要由子进程执行的代码移动到由 if __name__ == '__main__': 控制的块中,例如 arary 初始化代码.这些变化导致:

Your function matchdictfunc currently has arguments x, y, q, z but in fact does not use them, although in the multiprocessing version it will need to use two arguments. There is also no need for function splitinput to replicate Q and Z into returned values Q_split and Z_split. Currently, matchdictfunc is expecting Q and Z to be global variables and we can arrange for that to be the case in the multiprocessing version by using the initializer and initargs arguments when constructing the pool. You should also move code that you do not need to be executed by the sub-processes into the block controlled by if __name__ == '__main__':, such as the arary initialization code. These changes result in:

import numpy as np
import multiprocessing
import concurrent.futures

MULTIPROCESSING = True

def init_pool(q, z):
    global Q, Z
    Q = q
    Z = z

def matchdictfunc(index, X, Y):  # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
    lookup = {}
    for i, (q, z) in enumerate(zip(Q, Z)):
        lookup.setdefault((q, z), []).append(i)

    matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
    matchdict = {}
    for ind, match in enumerate(matchlist):
        matchdict[index[ind]] = match

    return matchdict

def split(a, n):  # function to split list in n even parts
    k, m = divmod(len(a), n)
    return list((a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)))

def splitinput(index, X, Y):  # split large lists X and Y in n-even parts (n = cpu_count))
    cpu_count = multiprocessing.cpu_count()

    #create multiple chunks for X and Y and index:
    index_split = split(index,cpu_count)
    X_split = split(X,cpu_count)
    Y_split = split(Y,cpu_count)
    return index_split, X_split ,Y_split


def main():
    # following required for non-multiprocessing
    if not MULTIPROCESSING:
        global Q, Z

    np.random.seed(1)

    # N will finally scale up to 10^9
    N = 10000000
    M = 300

    index = [str(x) for x in list(range(N))]
    X = np.random.randint(M, size=N)
    Y = np.random.randint(M, size=N)

    # Q and Z size is fixed at 120000
    Q = np.random.randint(M, size=120000)
    Z = np.random.randint(M, size=120000)

    # convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
    X = np.char.mod('%d', X).tolist()
    Y = np.char.mod('%d', Y).tolist()
    Q = np.char.mod('%d', Q).tolist()
    Z = np.char.mod('%d', Z).tolist()

    # for non-multiprocessing:
    if not MULTIPROCESSING:
        matchdict = matchdictfunc(index, X, Y)
    else:
        # for multiprocessing:
        # split lists to number of processors (cpu_count)
        index_split, X_split, Y_split = splitinput(index, X, Y)
        with concurrent.futures.ProcessPoolExecutor(initializer=init_pool, initargs=(Q, Z)) as executor:
            finallist = [result for result in executor.map(matchdictfunc, index_split, X_split, Y_split)]
            matchdict = {}
            for d in finallist:
                matchdict.update(d)

    #print(matchdict)

if __name__ == '__main__':
    main()

注意:我尝试了 N = 1000 的较小值(打印出 matchdict 的结果)并且多处理版本似乎返回相同的结果.我的机器没有资源以 N 的全部值运行而不会冻结其他一切.

Note: I tried this for a smaller value of N = 1000 (printing out the results of matchdict) and the multiprocessing version seemed to return the same results. My machine does not have the resources to run with the full value of N without freezing up everything else.

另一种方法

我假设您的 DNA 数据是外部数据,并且 XY 值可以一次读取 n或者可以读入和写出,以便这是可能的.然后,与其将所有数据都驻留在内存中并将其拆分为 32 块,我建议将其读取 n 个值,因此分成大约 N/n 个片段.

I am working under the assumption that your DNA data is external and the X and Y values can be read n values at a time or can be read in and written out so that this is possible. Then rather than having all the data resident in memory and splitting it up into 32 pieces, I propose that it be read n values at a time and thus broken up into approximately N/n pieces.

在下面的代码中,我切换到使用 multiprocessing.pool.Pool 类中的 imap 方法.优点是它懒惰向进程池提交任务,也就是说,iterable 参数不必是列表或可转换为列表.相反,池将迭代 iterable 发送任务到 chunksize 组中的池.在下面的代码中,我为 imap 的参数使用了一个生成器函数,它将生成连续的 XY 值.您的实际生成器函数将首先打开 DNA 文件(或多个文件)并读取文件的连续部分.

In the following code I have switched to using the imap method from class multiprocessing.pool.Pool. The advantage is that it lazily submits tasks to the process pool, that is, the iterable argument doesn't have to be a list or convertible to a list. Instead the pool will iterate over the iterable sending tasks to the pool in chunksize groups. In the code below, I have used a generator function for the argument to imap, which will generate successive X and Y values. Your actual generator function would first open the DNA file (or files) and read in successive portions of the file.

import numpy as np
import multiprocessing

def init_pool(q, z):
    global Q, Z
    Q = q
    Z = z

def matchdictfunc(t):  # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
    index, X, Y = t
    lookup = {}
    for i, (q, z) in enumerate(zip(Q, Z)):
        lookup.setdefault((q, z), []).append(i)

    matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
    matchdict = {}
    for ind, match in enumerate(matchlist):
        matchdict[index[ind]] = match

    return matchdict


def next_tuple(n, stop, M):
    start = 0
    while True:
        end = min(start + n, stop)
        index = [str(x) for x in list(range(start, end))]
        x = np.random.randint(M, size=n)
        y = np.random.randint(M, size=n)
        # convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
        x = np.char.mod('%d', x).tolist()
        y = np.char.mod('%d', y).tolist()
        yield (index, x, y)
        start = end
        if start >= stop:
            break

def compute_chunksize(XY_AT_A_TIME, N):
    n_tasks, remainder = divmod(N, XY_AT_A_TIME)
    if remainder:
        n_tasks += 1
    chunksize, remainder = divmod(n_tasks, multiprocessing.cpu_count() * 4)
    if remainder:
        chunksize += 1
    return chunksize


def main():
    np.random.seed(1)

    # N will finally scale up to 10^9
    N = 10000000
    M = 300

    # Q and Z size is fixed at 120000
    Q = np.random.randint(M, size=120000)
    Z = np.random.randint(M, size=120000)

    # convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
    Q = np.char.mod('%d', Q).tolist()
    Z = np.char.mod('%d', Z).tolist()

    matchdict = {}
    # number of X, Y pairs at a time:
    # experiment with this, especially as N increases:
    XY_AT_A_TIME = 10000
    chunksize = compute_chunksize(XY_AT_A_TIME, N)
    #print('chunksize =', chunksize) # 32 with 8 cores
    with multiprocessing.Pool(initializer=init_pool, initargs=(Q, Z)) as pool:
        for d in pool.imap(matchdictfunc, next_tuple(XY_AT_A_TIME, N, M), chunksize):
            matchdict.update(d)
    #print(matchdict)

if __name__ == '__main__':
    import time
    t = time.time()
    main()
    print('total time =', time.time() - t)

更新

我想从基准测试中消除使用 numpy.众所周知,numpy 对其某些操作使用多处理,并且在多处理应用程序中使用时可能会导致性能降低.所以我做的第一件事就是拿OP的原始程序和代码在哪里,例如:

I want to eliminate using numpy from the benchmark. It is known that numpy uses multiprocessing for some of its operations and when used in multiprocessing applications can be the cause of of reduced performance. So the first thing I did was to take the OP's original program and where the code was, for example:

import numpy as np

np.random.seed(1)
X = np.random.randint(M, size=N)
X = np.char.mod('%d', X).tolist()

我将其替换为:

import random

random.seed(1)
X = [str(random.randrange(M)) for _ in range(N)]

然后我对 OP 的程序进行计时以获得生成 XYQZ 的时间> 列表和总时间.在我的桌面上,时间分别约为 20 秒和 37 秒!因此,在我的多处理版本中,仅为进程池的进程生成参数就占总运行时间的一半以上.对于第二种方法,我还发现,当我增加 XY_AT_A_TIME 的值时,CPU 利用率从 100% 下降到 50% 左右,但总运行时间有所改善.我还不太明白这是为什么.

I then timed the OP's program to get the time for generating the X, Y, Q and Z lists and the total time. On my desktop the times were approximately 20 seconds and 37 seconds respectively! So in my multiprocessing version just generating the arguments for the process pool's processes is more than half the total running time. I also discovered for the second approach, that as I increased the value of XY_AT_A_TIME that the CPU utilization went down from 100% to around 50% but that the total elapsed time improved. I haven't quite figured out why this is.

接下来,我尝试模拟程序在读取数据时的运行方式.所以我将 2 * N 个随机整数写入文件 temp.txt 并修改了 OP 的程序以从文件初始化 XY,然后修改我的第二种方法的 next_tuple 函数如下:

Next I tried to emulate how the programs would function if they were reading the data in. So I wrote out 2 * N random integers to a file, temp.txt and modified the OP's program to initialize X and Y from the file and then modified my second approach's next_tuple function as follows:

def next_tuple(n, stop, M):
    with open('temp.txt') as f:
        start = 0
        while True:
            end = min(start + n, stop)
            index = [str(x) for x in range(start, end)] # improvement
            x = [f.readline().strip() for _ in range(n)]
            y = [f.readline().strip() for _ in range(n)]
            yield (index, x, y)
            start = end
            if start >= stop:
                break

同样,当我增加 XY_AT_A_TIME 时,CPU 利用率下降了(我发现最佳性能值为 400000,CPU 利用率仅为 40% 左右).

Again as I increased XY_AT_A_TIME the CPU utilization went down (best performance I found was value 400000 with CPU utilization only around 40%).

我终于重写了我的第一种方法,试图提高内存效率(见下文).此更新版本再次从文件中读取随机数,但使用 XYindex 的生成器函数,因此我不需要内存完整列表和拆分.同样,由于我在两种情况下分配 XY 值的方式(一个简单的解决方案),我不希望多处理和非多处理版本的结果重复为此,应该将随机数写入 X 值文件和 Y 值文件,然后从这两个文件中读取值).但这对运行时间没有影响.但同样,尽管使用默认池大小 8,CPU 利用率仅为 30 - 40%(波动很大),总体运行时间几乎是非多处理运行时间的两倍.但为什么?

I finally rewrote my first approach trying to be more memory efficient (see below). This updated version again reads the random numbers from a file but uses generator functions for X, Y and index so I don't need memory for both the full lists and the splits. Again, I do not expect duplicated results for the multiprocessing and non-multiprocessing versions because of the way I am assigning the X and Y values in the two cases (a simple solution to this would have been to write the random numbers to an X-value file and a Y-value file and read the values back from the two files). But this has no effect on the running times. But again, the CPU utilization, despite using the default pool size of 8, was only 30 - 40% (it fluctuated quite a bit) and the overall running time was nearly double the non-multiprocessing running time. But why?

import random
import multiprocessing
import concurrent.futures
import time

MULTIPROCESSING = True

POOL_SIZE = multiprocessing.cpu_count()

def init_pool(q, z):
    global Q, Z
    Q = q
    Z = z

def matchdictfunc(index, X, Y):  # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
    lookup = {}
    for i, (q, z) in enumerate(zip(Q, Z)):
        lookup.setdefault((q, z), []).append(i)

    matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
    matchdict = {}
    for ind, match in enumerate(matchlist):
        matchdict[index[ind]] = match

    return matchdict

def split(a):  # function to split list in POOL_SIZE even parts
    k, m = divmod(N, POOL_SIZE)
    divisions = [(i + 1) * k + min(i + 1, m) - (i * k + min(i, m)) for i in range(POOL_SIZE)]
    parts = []
    for division in divisions:
        part = [next(a) for _ in range(division)]
        parts.append(part)
    return parts

def splitinput(index, X, Y):  # split large lists X and Y in n-even parts (n = POOL_SIZE)
    #create multiple chunks for X and Y and index:
    index_split = split(index)
    X_split = split(X)
    Y_split = split(Y)
    return index_split, X_split ,Y_split


def main():
    global N

    # following required for non-multiprocessing
    if not MULTIPROCESSING:
        global Q, Z

    random.seed(1)

    # N will finally scale up to 10^9
    N = 10000000
    M = 300

    # Q and Z size is fixed at 120000
    Q = [str(random.randrange(M)) for _ in range(120000)]
    Z = [str(random.randrange(M)) for _ in range(120000)]

    with open('temp.txt') as f:
        # for non-multiprocessing:
        if not MULTIPROCESSING:
            index = [str(x) for x in range(N)]
            X = [f.readline().strip() for _ in range(N)]
            Y = [f.readline().strip() for _ in range(N)]
            matchdict = matchdictfunc(index, X, Y)
        else:
            # for multiprocessing:
            # split lists to number of processors (POOL_SIZE)
            # generator functions:
            index = (str(x) for x in range(N))
            X = (f.readline().strip() for _ in range(N))
            Y = (f.readline().strip() for _ in range(N))
            index_split, X_split, Y_split = splitinput(index, X, Y)
            with concurrent.futures.ProcessPoolExecutor(POOL_SIZE, initializer=init_pool, initargs=(Q, Z)) as executor:
                finallist = [result for result in executor.map(matchdictfunc, index_split, X_split, Y_split)]
                matchdict = {}
                for d in finallist:
                    matchdict.update(d)


if __name__ == '__main__':
    t = time.time()
    main()
    print('total time =', time.time() - t)

分辨率?

会不会是将数据从主进程传输到子进程的开销,涉及共享内存的读写,是什么让一切变慢了?所以,这个最终版本是为了消除这个潜在的放缓原因.在我的桌面上,我有 8 个处理器.对于第一种方法,将 N = 10000000 XY 值除在其中意味着每个进程应该处理 N//8 ->1250000 个值.因此,我将 16 组 1250000 个数字(X 的 8 组和 Y 的 8 组)中的随机数写为二进制文件,并注明每个数字的偏移量和长度这 16 个组使用以下代码:

Can it be that the overhead of transferring the data from the main process to the subprocesses, which involves shared memory reading and writing, is what is slowing everything down? So, this final version was an attempt to eliminate this potential cause for the slowdown. On my desktop I have 8 processors. For the first approach dividing the N = 10000000 X and Y values among them means that each process should be processing N // 8 -> 1250000 values. So I wrote out the random numbers in 16 groups of 1250000 numbers (8 groups for X and 8 groups for Y) as a binary file noting the offset and length of each of these 16 groups using the following code:

import random

random.seed(1)

with open('temp.txt', 'wb') as f:
    offsets = []
    for i in range(16):
        n = [str(random.randrange(300)) for _ in range(1250000)]
        b = ','.join(n).encode('ascii')
        l = len(b)
        offsets.append((f.tell(), l))
        f.write(b)

print(offsets)

然后我构造了列表 X_SPECSY_SPECS ,工作函数 matchdictfunc 可以使用它们来读取值 XY 根据需要本身.所以现在不是一次向这个工作函数传递 1250000 个值,我们只是将索引 0、1、...7 传递给工作函数,这样它就知道它必须读入哪个组.共享内存访问已完全消除在访问 XY(QZ 仍然需要)并且磁盘访问转移到进程水池.CPU 利用率当然不会是 100%,因为工作函数正在执行 I/O.但是我发现虽然现在运行时间有了很大的改善,但它仍然没有比原来的非多处理版本有任何改进:

And from that I constructed lists X_SPECS and Y_SPECS that the worker function matchdictfunc could use for reading in the values X and Y itself as needed. So now instead of passing 1250000 values at a time to this worker function, we are just passing indices 0, 1, ... 7 to the worker function so it knows which group it has to read in. Shared memory access has been totally eliminated in accessing X and Y (it's still required for Q and Z) and the disk access moved to the process pool. The CPU Utilization will, of course, not be 100% because the worker function is doing I/O. But I found that while the running time has now been greatly improved, it still offered no improvement over the original non-multiprocessing version:

OP's original program modified to read `X` and `Y` values in from file: 26.2 seconds
Multiprocessing elapsed time: 29.2 seconds

事实上,当我通过将 ProcessPoolExecutor 替换为 ThreadPoolExecutor 来更改代码以使用多线程时,经过的时间几乎又减少了一秒,这表明几乎没有争用对于工作函数中的全局解释器锁,即大部分时间都花在 C 语言代码上.主要工作由:

In fact, when I changed the code to use multithreading by replacing the ProcessPoolExecutor with ThreadPoolExecutor, the elpased time went down almost another second demonstrating the there is very little contention for the Global Interpreter Lock within the worker function, i.e. most of the time is being spent in C-language code. The main work is done by:

matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]

当我们使用 multiprocessing 执行此操作时,我们有多个列表推导式和多个 zip 操作(在较小的列表上)由单独的进程执行,然后我们最后组合结果.这是我的猜测,但通过采用已经有效的操作并将它们缩小到多个处理器,可能不会获得任何性能提升.或者换句话说,我被难住了,这是我最好的猜测.

When we do this with multiprocessing, we have multiple list comprehensions and multiple zip operations (on smaller lists) being performed by separate processes and we then assemble the results in the end. This is conjecture on my part, but there just may not be any performance gains to be had by taking what are already efficient operations and scaling them down across multiple processors. Or in other words, I am stumped and that was my best guess.

最终版本(有一些额外的优化——请注意):

The final version (with some additional optimizations -- please note):

import random
import concurrent.futures
import time

POOL_SIZE = 8

X_SPECS = [(0, 4541088), (4541088, 4541824), (9082912, 4540691), (13623603, 4541385), (18164988, 4541459), (22706447, 4542961), (27249408, 4541847), (31791255, 4542186)]
Y_SPECS = [(36333441, 4542101), (40875542, 4540120), (45415662, 4540802), (49956464, 4540971), (54497435, 4541427), (59038862, 4541523), (63580385, 4541571), (68121956, 4542335)]

def init_pool(q_z):
    global Q_Z
    Q_Z = q_z

def matchdictfunc(index, i):  # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
    x_offset, x_len = X_SPECS[i]
    y_offset, y_len = Y_SPECS[i]
    with open('temp.txt', 'rb') as f:
        f.seek(x_offset, 0)
        X = f.read(x_len).decode('ascii').split(',')
        f.seek(y_offset, 0)
        Y = f.read(y_len).decode('ascii').split(',')

    lookup = {}
    for i, (q, z) in enumerate(Q_Z):
        lookup.setdefault((q, z), []).append(i)

    matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
    matchdict = {}
    for ind, match in enumerate(matchlist):
        matchdict[index[ind]] = match

    return matchdict

def split(a):  # function to split list in POOL_SIZE even parts
    k, m = divmod(N, POOL_SIZE)
    divisions = [(i + 1) * k + min(i + 1, m) - (i * k + min(i, m)) for i in range(POOL_SIZE)]
    parts = []
    for division in divisions:
        part = [next(a) for _ in range(division)]
        parts.append(part)
    return parts



def main():
    global N

    random.seed(1)

    # N will finally scale up to 10^9
    N = 10000000
    M = 300

    # Q and Z size is fixed at 120000
    Q = (str(random.randrange(M)) for _ in range(120000))
    Z = (str(random.randrange(M)) for _ in range(120000))
    Q_Z = list(zip(Q, Z)) # pre-compute the `zip` function

    # for multiprocessing:
    # split lists to number of processors (POOL_SIZE)
    # generator functions:
    index = (str(x) for x in range(N))
    index_split = split(index)
    with concurrent.futures.ProcessPoolExecutor(POOL_SIZE, initializer=init_pool, initargs=(Q_Z,)) as executor:
        finallist = executor.map(matchdictfunc, index_split, range(8))
        matchdict = {}
        for d in finallist:
            matchdict.update(d)

    print(len(matchdict))

if __name__ == '__main__':
    t = time.time()
    main()
    print('total time =', time.time() - t)

进程间内存传输的成本

在下面的代码中,调用 create_files 函数来创建 100 个相同的文件,其中包含pickled"文件.1,000,000 个号码的列表.然后,我使用大小为 8 的多处理池两次读取 100 个文件并解压文件以重建原始列表.第一种情况 (worker1) 和第二种情况 (worker2) 之间的区别在于,在第二种情况下,列表会返回给调用者(但不会保存,以便内存可以立即被垃圾回收).第二个案例的时间是第一个案例的三倍多.这也可以部分解释为什么您在切换到多处理时没有看到加速.

In the code below function create_files was called to create 100 identical files consisting of a "pickled" list of 1,000,000 numbers. I then used a multiprocessing pool of size 8 twice to read the 100 files and unpickle the files to reconstitute the original lists. The difference between the first case (worker1) and the second case (worker2) was that in the second case the list is returned back to the caller (but not saved so that memory can be garbage collected immediately). The second case took more than three times longer than the first case. This can also explain in part why you do not see a speedup when you switch to multiprocessing.

from multiprocessing import Pool
import pickle
import time

def create_files():
    l = [i for i in range(1000000)]
    # create 100 identical files:
    for file in range(1, 101):
        with open(f'pkl/test{file}.pkl', 'wb') as f:
            pickle.dump(l, f)


def worker1(file):
    file_name = f'pkl/test{file}.pkl'
    with open(file_name, 'rb') as f:
        obj = pickle.load(f)


def worker2(file):
    file_name = f'pkl/test{file}.pkl'
    with open(file_name, 'rb') as f:
        obj = pickle.load(f)
    return file_name, obj

POOLSIZE = 8

if __name__ == '__main__':
    #create_files()

    pool = Pool(POOLSIZE)
    t = time.time()
    # no data returned:
    for file in range(1, 101):
        pool.apply_async(worker1, args=(file,))
    pool.close()
    pool.join()
    print(time.time() - t)

    pool = Pool(POOLSIZE)
    t = time.time()
    for file in range(1, 101):
        pool.apply_async(worker2, args=(file,))
    pool.close()
    pool.join()
    print(time.time() - t)

    t = time.time()
    for file in range(1, 101):
        worker2(file)
    print(time.time() - t)

这篇关于Python 多处理(将数据拆分成更小的块 - 多个函数参数)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆