Python多处理-进程之间的管道通信 [英] Python multiprocessing - pipe communication between processes

查看:289
本文介绍了Python多处理-进程之间的管道通信的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在做一个从客户端传感器收集数据,处理收集的数据并将其发送回客户端的项目.可能有多个客户端要求同时从我们的服务器接收一些数据,因此我必须实现多处理.我不能使用线程,因为某些变量必须与客户端无关.如果这样做的话,我的代码可能会变得非常复杂,以致于无法阅读和升级,我也不想这么做.因此,我决定使用流程,但是现在在父流程和子流程之间需要剪切一些数据.经过研究,我发现管道通信可以满足我的要求.

I am making a project which collects data from clients sensors, processes the gathered data and sends it back to clients. There can be multiple clients asking to receive some data from our server at the same time, so I had to implement multiprocessing. I can't use Threads because there are certain variables that must be client independent. If I did, my code would probably get very complicated to read and upgrade, and I don't want that. So I decided to use Processes, but now there is some data that needs to be sheared between parent and child Processes. After some research, I found that Pipe communication would satisfy my requirements.

以下代码成功地将数据从父级发送到子进程,子级更新数据并将其发送回父级.但这仅是因为sleep()函数阻止了父级与子级同时使用管道.

The following code successfully sends data from parent to child Process, child updates the data and sends it back to the parent. But it is working only because of the sleep() function that stops the parent from using the pipe at the same time as the child.

如何更改它以使其相同,但是如果没有sleep()函数,我相信它将来很可能会引起问题?

How can it be changed so it does the same, but without the sleep() function for which I believe it will most probably cause problems in the future?

from multiprocessing import Process, Pipe
import time

def update_data(pipe):
    p_out, p_in = pipe
    L = []
    while True:
        message = p_out.recv()
        if message=='FINISHED':
            break       
        L.append(message)      

    L.append(['new data'])       #updating received data
    writer(L, p_in)              #sending received data to parent Process
    p_in.close()

def writer(i, p_in):
    p_in.send(i)
    p_in.send('FINISHED')

L = ['0' for i in range(10)]     #current data
if __name__=='__main__':
    p_out, p_in = Pipe()
    update_data_process = Process(target=update_data, args=((p_out, p_in),))
    update_data_process.start()    
    writer(L, p_in)              #sending current data to child Process
    time.sleep(3)                #needs to be changed
    while True:
        message = p_out.recv()
        if message != 'FINISHED':
            L = message
        else:
            break
    print(L)
    p_in.close()
    update_data_process.join()

推荐答案

您遇到了问题,因为您对待连接的方式就像是 simplex ,但是默认情况下Pipe()返回双向(双向)连接. 这意味着当您呼叫parent_conn, child_conn = Pipe()时,您会得到一个连接,仅 父母应将其用于读取 写入另一个子级的此类连接对象.父级子级仅对连接对象起作用.

You have the issue because you are treating the connections like if they were simplex, but Pipe() by default returns duplex (two-way) connections. This means when you call parent_conn, child_conn = Pipe(), you get one connection, only the parent should use for reads and writes and another such connection object for the child. Parent and child only operate upon their connection objects.

from multiprocessing import Process, Pipe
from datetime import datetime

SENTINEL = 'SENTINEL'


def update_data(child_conn):

    result = []

    for msg in iter(child_conn.recv, SENTINEL):
        print(f'{datetime.now()} child received {msg}')
        result.append(msg)

    print(f'{datetime.now()} child received sentinel')
    result.append(['new data'])
    writer(child_conn, result)
    child_conn.close()


def writer(conn, data):
    conn.send(data)
    conn.send(SENTINEL)


if __name__=='__main__':

    parent_conn, child_conn = Pipe()  # default is duplex!
    update_data_process = Process(target=update_data, args=(child_conn,))
    update_data_process.start()

    data = ['0' for i in range(3)]
    writer(parent_conn, data)

    for msg in iter(parent_conn.recv, SENTINEL):
        print(f'{datetime.now()} parent received {msg}')

    print(f'{datetime.now()} parent received sentinel')
    parent_conn.close()
    update_data_process.join()

示例输出:

2019-03-12 00:09:52.920375 child received ['0', '0', '0']
2019-03-12 00:09:52.920512 child received sentinel
2019-03-12 00:09:52.920702 parent received [['0', '0', '0'], ['new data']]
2019-03-12 00:09:52.920764 parent received sentinel

Process finished with exit code 0

如果您不熟悉iter(callable, sentinel)的用法,请在此处.

In case you are unfamiliar with the use of iter(callable, sentinel), look here.

这篇关于Python多处理-进程之间的管道通信的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆