Python 3.4 多处理队列比管道快,出乎意料 [英] Python 3.4 multiprocessing Queue faster than Pipe, unexpected

查看:34
本文介绍了Python 3.4 多处理队列比管道快,出乎意料的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在做一个从 udp 套接字接收样本的音频播放器,并且一切正常.但是当我实现了一个 Lost Concealment 算法时,播放器无法以例外的速率保持静音(每 10 毫秒发送一个包含多个 160 字节的列表).

I am doing an audio player that received samples from an udp socket, and everything was working fine. But when I implemented an Lost Concealment algorithm, the player failed to keep producing silence at the excepted rate (each 10ms send a list of multiple 160 bytes).

当使用 pyaudio 播放音频时,使用阻塞调用 write 来播放一些样本,我注意到它在样本的平均持续时间内被阻塞.所以我创建了一个新的专用进程来播放样本.

When playing audio with pyaudio, using the blocking call write to play some samples, I noticed it blocked on average for duration of the sample. So I created a new dedicated process to play the samples.

主进程处理音频的输出流,并使用 multiprocessing.Pipe 将结果发送到该进程.我决定使用 multiprocessing.Pipe 因为它应该比其他方式更快.

The main process processes the output stream of audio and sends the result to that process using a multiprocessing.Pipe . I decided to use the multiprocessing.Pipe because it was supposed to be faster than the other ways.

不幸的是,当我在虚拟机上运行该程序时,比特率是我在高速 PC 上获得的比特率的一半,这并没有达到目标比特率.

Unfortunately, when I runned the program on a virtual machine, the bitrate was half of what I was getting on my fast PC, which didnt fail to meet the target bitrate.

经过一些测试,我得出结论,导致延迟的原因是 Pipe 的函数 send.

After some tests, I concluded that what was causing the delay was the Pipe's function send.

我做了一个简单的基准测试脚本(见下文)来查看传输到进程的各种方法之间的差异.该脚本会持续发送 [b'\x00'*160] 5 秒,并计算字节对象总共发送了多少字节.我测试了以下发送方法:不发送"、multiprocessing.Pipe、multiprocessing.Queue、multiprocessing.Manager、multiprocessing.Listener/Client,最后是socket.socket:

I did a simple benchmark script (see below) to see the differences between the various methods of transmiting to a process. The script, keeps sending a [b'\x00'*160] constantly for 5 seconds, and counts how many bytes of the bytes object were sent in total. I tested the following methods of sending: "not sending", multiprocessing.Pipe, multiprocessing.Queue, multiprocessing.Manager, multiprocessing.Listener/Client and finally, socket.socket:

我的快速"PC 运行窗口 7 x64 的结果:

Results for my "fast" PC running window 7 x64:

test_empty     :     1516076640
test_pipe      :       58155840
test_queue     :      233946880
test_manager   :        2853440
test_socket    :       55696160
test_named_pipe:       58363040

VirtualBox 的 VM 来宾运行 Windows 7 x64,主机运行 Windows 7 x64 的结果:

Results for the VirtualBox's VM guest running Windows 7 x64, host running Windows 7 x64:

test_empty     :     1462706080
test_pipe      :       32444160
test_queue     :      204845600
test_manager   :         882560
test_socket    :       20549280
test_named_pipe:       35387840  

使用的脚本:

from multiprocessing import Process, Pipe, Queue, Manager
from multiprocessing.connection import Client, Listener
import time

FS = "{:<15}:{:>15}"


def test_empty():
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]

        sent += len(data)
        if time.time()-s >= 5:
            break
    print(FS.format("test_empty", sent))


def pipe_void(pipe_in):
    while True:
        msg = pipe_in.recv()
        if msg == []:
            break


def test_pipe():
    pipe_out, pipe_in = Pipe()
    p = Process(target=pipe_void, args=(pipe_in,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        pipe_out.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    pipe_out.send([])
    p.join()
    print(FS.format("test_pipe", sent))


def queue_void(q):
    while True:
        msg = q.get()
        if msg == []:
            break


def test_queue():
    q = Queue()
    p = Process(target=queue_void, args=(q,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        q.put(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    q.put([])
    p.join()

    print(FS.format("test_queue", sent))


def manager_void(l, lock):
    msg = None
    while True:
        with lock:
            if len(l) > 0:
                msg = l.pop(0)
        if msg == []:
            break


def test_manager():
    with Manager() as manager:
        l = manager.list()
        lock = manager.Lock()
        p = Process(target=manager_void, args=(l, lock))
        p.start()
        s = time.time()
        sent = 0
        while True:
            data = b'\x00'*160
            lst = [data]
            with lock:
                l.append(lst)
            sent += len(data)
            if time.time()-s >= 5:
                break
        with lock:
            l.append([])
        p.join()

        print(FS.format("test_manager", sent))


def socket_void():
    addr = ('127.0.0.1', 20000)
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break


def test_socket():
    addr = ('127.0.0.1', 20000)
    listener = Listener(addr, "AF_INET")
    p = Process(target=socket_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()

    print(FS.format("test_socket", sent))


def named_pipe_void():
    addr = '\\\\.\\pipe\\Test'
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break


def test_named_pipe():
    addr = '\\\\.\\pipe\\Test'
    listener = Listener(addr, "AF_PIPE")
    p = Process(target=named_pipe_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()

    print(FS.format("test_named_pipe", sent))


if __name__ == "__main__":
    test_empty()
    test_pipe()
    test_queue()
    test_manager()
    test_socket()
    test_named_pipe()

问题

  • 如果 Queue 使用 Pipe 在这种情况下它比 Pipe 快多少?这与问题 Python 多处理 - 管道 vs 队列
  • 相矛盾
  • 如何保证从一个进程到另一个进程的恒定比特率流,同时具有低发送延迟?
  • 在我的程序中,在尝试使用队列而不是管道之后.我得到了巨大的提升.

    Inside my program, after trying out with Queues instead of Pipes. I got an enormous boost.

    在我的电脑上,使用管道我得到了 +- 16000 B/s,使用队列我得到了 +-750 万 B/s.在虚拟机上,我从 +-13000 B/s 得到了 650 万 B/s.使用 Queue instread of Pipe 的字节数增加了大约 500 倍.

    On my computer, using Pipes I got +- 16000 B/s , using Queues I got +-7.5 Million B/s. On the virtual machine I got from +-13000 B/s to 6.5 Million B/s. Thats about 500 times more bytes using Queue instread of Pipe.

    当然我不会每秒播放数百万字节,我只会播放正常的声音速率.(在我的例子中是 16000 B/s,与上面的值一致).
    但关键是,我可以将速率限制为我想要的,同时仍有时间完成其他计算(例如从套接字接收、应用声音算法等)

    Of course I wont be playing millions of bytes per seconds, I will only be playing the normal rate for sound. (in my case 16000 B/s, coincidence with the value above).
    But the point is, I can limit the rate to what I want, while still having time to finish other computations (like receiving from sockets, applying sound algorithms, etc)

    推荐答案

    我不能肯定地说,但我认为您正在处理的问题是同步 I/O 还是异步 I/O.我的猜测是 Pipe 以某种方式结束了同步,而 Queue 结束了异步.为什么这个问题和答案可能会更好地回答一个以一种方式默认而另一种是另一种方式的原因:

    I can't say for sure, but I think the issue you're dealing with is synchronous versus asynchronous I/O. My guess is that the Pipe is somehow ending up synchronous and the Queue is ending up asynchronous. Why exactly one is defaulting one way and the other is the other might be better answered by this question and answer:

    python 管道的同步/异步行为

    这篇关于Python 3.4 多处理队列比管道快,出乎意料的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆