为什么`multiprocessing.Queue.get` 这么慢? [英] Why is `multiprocessing.Queue.get` so slow?

查看:121
本文介绍了为什么`multiprocessing.Queue.get` 这么慢?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要帮助来理解 multiprocessing.Queue.我面临的问题是,与调用 queue.put(...) 和队列的缓冲区(双端队列).

I need help in understanding multiprocessing.Queue. The problem I'm facing is that getting results from queue.get(...) are hilariously behind compared to calls to queue.put(...) and the queue's buffer (the deque).

这种泄漏的抽象使我调查了队列的内部结构.它简单的源代码 只是将我指向双端队列实现,这似乎也很简单,我无法用它来解释我看到的行为.我还读到 Queue 使用管道,但我似乎无法在源代码中找到它.

This leaking abstraction led me to investigate the internals of the queue. Its straightforward source code just points me to the deque implementation, and that also seems simple enough that I cannot use it to explain the behavior I'm seeing. Also I read that Queue uses pipes, but I can't seem to find that in the source code.

我已将其归结为重现问题的最小示例,并在其下方指定了可能的输出.

I've boiled it down to a minimal example reproducing the problem, and I specify a possible output below that.

import threading
import multiprocessing
import queue

q = None
def enqueue(item):
    global q
    if q is None:
        q = multiprocessing.Queue()
        process = threading.Thread(target=worker, args=(q,))  # or multiprocessing.Process Doesn't matter
        process.start()
    q.put(item)
    print(f'len putted item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')


def worker(local_queue):
    while True:
        try:
            while True:  # get all items
                item = local_queue.get(block=False)
                print(f'len got item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
        except queue.Empty:
            print('empty')


if __name__ == '__main__':
    for i in range(1, 100000, 1000):
        enqueue(list(range(i)))

输出:

empty
empty
empty
len putted item: 1. qsize: 1. buffer len: 1
len putted item: 1001. qsize: 2. buffer len: 2
len putted item: 2001. qsize: 3. buffer len: 1
len putted item: 3001. qsize: 4. buffer len: 2
len putted item: 4001. qsize: 5. buffer len: 3
len putted item: 5001. qsize: 6. buffer len: 4
len putted item: 6001. qsize: 7. buffer len: 5
len putted item: 7001. qsize: 8. buffer len: 6
len putted item: 8001. qsize: 9. buffer len: 7
len putted item: 9001. qsize: 10. buffer len: 8
len putted item: 10001. qsize: 11. buffer len: 9
len putted item: 11001. qsize: 12. buffer len: 10
len putted item: 12001. qsize: 13. buffer len: 11
len putted item: 13001. qsize: 14. buffer len: 12
len putted item: 14001. qsize: 15. buffer len: 13
len putted item: 15001. qsize: 16. buffer len: 14
len got item: 1. qsize: 15. buffer len: 14
len putted item: 16001. qsize: 16. buffer len: 15
len putted item: 17001. qsize: 17. buffer len: 16
len putted item: 18001. qsize: 18. buffer len: 17
len putted item: 19001. qsize: 19. buffer len: 18
len putted item: 20001. qsize: 20. buffer len: 19
len putted item: 21001. qsize: 21. buffer len: 20
len putted item: 22001. qsize: 22. buffer len: 21
len putted item: 23001. qsize: 23. buffer len: 22
len putted item: 24001. qsize: 24. buffer len: 23
len putted item: 25001. qsize: 25. buffer len: 24
len putted item: 26001. qsize: 26. buffer len: 25
len putted item: 27001. qsize: 27. buffer len: 26
len putted item: 28001. qsize: 28. buffer len: 27
len got item: 1001. qsize: 27. buffer len: 27
empty
len putted item: 29001. qsize: 28. buffer len: 28
empty
empty
empty
len got item: 2001. qsize: 27. buffer len: 27
empty
len putted item: 30001. qsize: 28. buffer len: 28

关于结果,我希望您注意以下几点:插入元素28001后,worker发现队列中没有元素了,但还有几十个元素.由于同步,我可以只获得除少数之外的所有内容.但它只能找到两个

I want you to notice the following about the result: After inserting element 28001, the worker finds that there are no elements left in the queue, whereas there are dozens more. Because of synchronization, I'm okay with only getting all but a few of them. But it only manages to find two!

这种模式还在继续.

这似乎与我放入队列的对象的大小有关.对于小对象,比如 i 而不是 list(range(i)),这个问题不会出现.但是所讨论的对象的大小仍然是千字节,几乎不足以体现如此显着的延迟(在我真实世界的非最小示例中,这很容易花费几分钟)

This seems to do with the size of the objects I put on the queue. For small objects, say i as opposed to list(range(i)), this problem does not appear. But the sizes of the objects that were talking about are still kilobytes, not nearly large enough to dignify such significant delays (in my real-world non-minimal example this took minutes easily)

我的具体问题是:如何在 Python 中的进程之间共享(并非如此)大量数据?另外,我想知道在 Queue 的内部实现中,这种缓慢来自何处

My question specifically is: How can I share (not so) large amounts of data between processes in Python? Additionally, I'd like to know where in the internal implementation of Queue does this sluggishness comes from

推荐答案

我也遇到了这个问题.我正在发送大型 numpy 数组(~300MB),但在 mp.queue.get() 处速度很慢.

I met this problem too. I was sending large numpy arrays (~300MB), and it was so slow at mp.queue.get().

在查看了 mp.Queue 的 python2.7 源代码后,我发现最慢的部分(在类 Unix 系统上)是 socket_connection.c,但我没有深入研究.

After some look into the python2.7 source code of mp.Queue, I found the slowest part (on unix-like systems) is _conn_recvall() in socket_connection.c, but I was not looking deeper.

为了解决这个问题,我构建了一个实验包 FMQ.

To workaround the problem I build an experimental package FMQ.

这个项目的灵感来自于 multiprocessing.Queue (mp.Queue) 的使用.由于管道的速度限制(在类 Unix 系统上),mp.Queue 对于大数据项很慢.

This project is inspired by the use of multiprocessing.Queue (mp.Queue). mp.Queue is slow for large data item because of the speed limitation of pipe (on Unix-like systems).

通过 mp.Queue 处理进程间传输,FMQ 实现了一个窃取线程,一旦任何项目可用,它就会从 mp.Queue 窃取一个项目,并将其放入 Queue.Queue.然后,消费者进程可以立即从 Queue.Queue 中获取数据.

With mp.Queue handling the inter-process transfer, FMQ implements a stealer thread, which steals an item from mp.Queue once any item is available, and puts it into a Queue.Queue. Then, the consumer process can fetch the data from the Queue.Queue immediately.

加速基于以下假设:生产者和消费者进程都是计算密集型的(因此需要进行多处理)并且数据很大(例如 > 50 个 227x227 图像).否则,带有多处理的 mp.Queue 或带有线程的 Queue.Queue 就足够了.

The speed-up is based on the assumption that both producer and consumer processes are compute-intensive (thus multiprocessing is neccessary) and the data is large (eg. >50 227x227 images). Otherwise mp.Queue with multiprocessing or Queue.Queue with threading is good enough.

fmq.Queue 像 mp.Queue 一样易于使用.

fmq.Queue is used easily like a mp.Queue.

注意还有一些已知问题,因为该项目处于早期阶段.

Note that there are still some Known Issues, as this project is at its early stage.

这篇关于为什么`multiprocessing.Queue.get` 这么慢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆