Python 多处理池队列通信 [英] Python multiprocessing Pool Queues communication

查看:70
本文介绍了Python 多处理池队列通信的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现一个由两个并行运行并通过队列进行通信的进程组成的池.

I'm trying to implement a pool of two processes that run in parallel and communicate through a queue.

目标是让 writer 进程使用 queue 将消息传递给 reader 进程.

The goal is to have a writer process that passes a message to a reader process by using a queue.

每个进程都在终端上打印反馈,以便得到反馈.

Each process is printing a feedback on the terminal in order to have a feedback.

代码如下:

#!/usr/bin/env python

import os
import time
import multiprocessing as mp
import Queue

def writer(queue):
    pid = os.getpid()
    for i in range(1,4):
        msg = i
        print "### writer ", pid, " -> ", msg
        queue.put(msg)
        time.sleep(1)
        msg = 'Done'
    print '### '+msg
    queue.put(msg)

def reader(queue):
    pid = os.getpid()
    time.sleep(0.5)
    while True:
        print "--- reader ", pid, " -> ",
        msg = queue.get()
        print msg
        if msg == 'Done':
            break

if __name__ == "__main__":
    print "Initialize the experiment PID: ", os.getpid()
    mp.freeze_support()

    queue = mp.Queue()

    pool = mp.Pool()
    pool.apply_async(writer, (queue)) 
    pool.apply_async(reader, (queue))

    pool.close()
    pool.join()

我期望的输出应该是这样的:

The output I am expecting should be something like this:

Initialize the experiment PID: 2341
writer 2342 -> 1
reader 2343 -> 1
writer 2342 -> 2
reader 2343 -> 2
writer 2342 -> 3
reader 2343 -> 3
Done

但是我只得到了这条线:

However I only get the line:

Initialize the experiment PID: 2341

然后脚本退出.

通过队列进行通信的池中两个进程实现进程间通信的正确方法是什么?

What is the correct way to implement the interprocess communication of two processes in a pool that communicates through a queue?

推荐答案

我使用了 mp.Manager().Queue() 作为队列,因为我们不能直接通过 Queue.尝试直接使用 Queue 会导致异常,但由于我们使用的是 apply_async,因此未得到处理.

I Used mp.Manager().Queue() as the queue because we couldn't directly pass Queue. Trying to directly use the Queue was causing exceptions but getting unhandled since we were using apply_async.

我将您的代码更新为:

#!/usr/bin/env python

import os
import time
import multiprocessing as mp
import Queue

def writer(queue):
    pid = os.getpid()
    for i in range(1,4):
        msg = i
        print "### writer ", pid, " -> ", msg
        queue.put(msg)
        time.sleep(1)
        msg = 'Done'
    print '### '+msg
    queue.put(msg)

def reader(queue):
    pid = os.getpid()
    time.sleep(0.5)
    while True:
        print "--- reader ", pid, " -> ",
        msg = queue.get()
        print msg
        if msg == 'Done':
            break

if __name__ == "__main__":
    print "Initialize the experiment PID: ", os.getpid()
    manager = mp.Manager()

    queue = manager.Queue()

    pool = mp.Pool()
    pool.apply_async(writer, (queue,))
    pool.apply_async(reader, (queue,))

    pool.close()
    pool.join()

我得到了这个输出:

Initialize the experiment PID:  46182
### writer  46210  ->  1
--- reader  46211  ->  1
### writer  46210  ->  2
--- reader  46211  ->  2
### writer  46210  ->  3
--- reader  46211  ->  3
### Done
--- reader  46211  ->  Done

我相信这正是您所期望的.

I believe this is what you expected.

这篇关于Python 多处理池队列通信的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆