在Python中的进程之间共享许多队列 [英] Sharing many queues among processes in Python

查看:57
本文介绍了在Python中的进程之间共享许多队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道multiprocessing.Manager()及其如何用于创建共享对象,尤其是可以在工作程序之间共享的队列.存在此问题这个问题,甚至是

I am aware of multiprocessing.Manager() and how it can be used to create shared objects, in particular queues which can be shared between workers. There is this question, this question, this question and even one of my own questions.

但是,我需要定义很多队列,每个队列都链接一对特定的进程.假设每对进程及其链接队列由变量key标识.

However, I need to define a great many queues, each of which is linking a specific pair of processes. Say that each pair of processes and its linking queue is identified by the variable key.

当我需要放置和获取数据时,我想使用字典来访问我的队列.我无法完成这项工作.我已经尝试了很多东西.将multiprocessing导入为mp:

I want to use a dictionary to access my queues when I need to put and get data. I cannot make this work. I've tried a number of things. With multiprocessing imported as mp:

在由多处理模块导入的配置文件(称为multi.py)中定义类似for key in all_keys: DICT[key] = mp.Queue的dict不会返回错误,但是队列DICT[key]在进程之间不共享,每个进程似乎拥有自己的队列副本,因此不会发生通信.

Defining a dict like for key in all_keys: DICT[key] = mp.Queue in a config file which is imported by the multiprocessing module (call it multi.py) does not return errors, but the queue DICT[key] is not shared between the processes, each one seems to have their own copy of the queue and thus no communication happens.

如果我尝试在定义进程并启动它们的主要多处理功能的开始处定义DICT,例如

If I try to define the DICT at the beginning of the main multiprocessing function that defines the processes and starts them, like

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Queue()

我得到了错误

RuntimeError: Queue objects should only be shared between processes through
 inheritance

更改为

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Manager().Queue()

只会使一切变得更糟.在multi.py开头而不是在main函数内部尝试类似的定义会返回类似的错误.

only makes everything worse. Trying similar definitions at the head of multi.py rather than inside the main function returns similar errors.

必须有一种方法可以在进程之间共享许多队列,而不必在代码中显式地命名每个队列.有什么想法吗?

There must be a way to share many queues between processes without explicitly naming each one in the code. Any ideas?

修改

这是程序的基本架构:

1-加载第一个模块,该模块定义了一些变量,导入了multi,启动了multi.main(),并加载了另一个模块,该模块开始了一系列模块加载和代码执行.同时...

1- load the first module, which defines some variables, imports multi, launches multi.main(), and loads another module which starts a cascade of module loads and code execution. Meanwhile...

2- multi.main看起来像这样:

2- multi.main looks like this:

def main():
    manager = mp.Manager()
    pool = mp.Pool()
    DICT2 = manager.dict()

    for key in all_keys:
        DICT2[key] = manager.Queue()
        proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file
        proc_2 =  pool.apply_async(targ2,(DICT2[key], otherargs,) 

我不是使用poolmanager,而是使用以下命令启动进程:

Rather than use pool and manager, I was also launching processes with the following:

mp.Process(target=targ1, args=(DICT[key],))

3-函数targ1接收来自主进程的输入数据(按key排序).它旨在将结果传递给DICT[key],以便targ2可以完成其工作.这是不起作用的部分.有任意数量的targ1targ2等,因此有任意数量的队列.

3 - The function targ1 takes input data that is coming in (sorted by key) from the main process. It is meant to pass the result to DICT[key] so targ2 can do its work. This is the part that is not working. There are an arbitrary number of targ1s, targ2s, etc. and therefore an arbitrary number of queues.

4-其中某些过程的结果将发送到也由key索引的一堆不同的数组/熊猫数据帧,我希望可以从任意过程中访问这些数据,即使是在不同的模块.我还没有写这部分,这可能是一个不同的问题. (我在这里提到它是因为上面3的答案也可能很好地解决了4.)

4 - The results of some of these processes will be sent to a bunch of different arrays / pandas dataframes which are also indexed by key, and which I would like to be accessible from arbitrary processes, even ones launched in a different module. I have yet to write this part and it might be a different question. (I mention it here because the answer to 3 above might also solve 4 nicely.)

推荐答案

当您尝试通过传递multiprocessing.Queue()作为参数来共享它时,听起来好像问题开始了.您可以通过创建托管队列来解决此问题:

It sounds like your issues started when you tried to share a multiprocessing.Queue() by passing it as an argument. You can get around this by creating a managed queue instead:

import multiprocessing
manager = multiprocessing.Manager()
passable_queue = manager.Queue()

使用管理器创建它时,您是在存储 proxy 并将其传递到队列,而不是队列本身,因此即使传递给工作进程的对象是复制后,它仍将指向相同的基础数据结构:您的队列.在概念上,它与C/C ++中的指针非常相似.如果以这种方式创建队列,则在启动工作进程时将能够通过它们.

When you use a manager to create it, you are storing and passing around a proxy to the queue, rather than the queue itself, so even when the object you pass to your worker processes is a copied, it will still point at the same underlying data structure: your queue. It's very similar (in concept) to pointers in C/C++. If you create your queues this way, you will be able to pass them when you launch a worker process.

由于您现在可以传递队列,因此不再需要管理字典.在main中保留一个普通字典,该字典将存储所有映射,并且只为您的工作进程提供所需的队列,因此他们无需访问任何映射.

Since you can pass queues around now, you no longer need your dictionary to be managed. Keep a normal dictionary in main that will store all the mappings, and only give your worker processes the queues they need, so they won't need access to any mappings.

我在这里写了一个例子.看起来您在工作人员之间传递对象,这就是在这里完成的工作.想象我们有两个处理阶段,数据在main的控制中开始和结束.看看我们如何创建像管道一样连接工人的队列,但是通过给他们仅他们需要的队列,就不需要他们知道任何映射:

I've written an example of this here. It looks like you are passing objects between your workers, so that's what's done here. Imagine we have two stages of processing, and the data both starts and ends in the control of main. Look at how we can create the queues that connect the workers like a pipeline, but by giving them only they queues they need, there's no need for them to know about any mappings:

import multiprocessing as mp

def stage1(q_in, q_out):

    q_out.put(q_in.get()+"Stage 1 did some work.\n")
    return

def stage2(q_in, q_out):

    q_out.put(q_in.get()+"Stage 2 did some work.\n")
    return

def main():

    pool = mp.Pool()
    manager = mp.Manager()

    # create managed queues
    q_main_to_s1 = manager.Queue()
    q_s1_to_s2 = manager.Queue()
    q_s2_to_main = manager.Queue()

    # launch workers, passing them the queues they need
    results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
    results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))

    # Send a message into the pipeline
    q_main_to_s1.put("Main started the job.\n")

    # Wait for work to complete
    print(q_s2_to_main.get()+"Main finished the job.")

    pool.close()
    pool.join()

    return

if __name__ == "__main__":
    main()

代码产生以下输出:

Main开始了工作.
第一阶段做了一些工作.
第二阶段做了一些工作.
Main完成了工作.

Main started the job.
Stage 1 did some work.
Stage 2 did some work.
Main finished the job.

我没有提供在字典中存储队列或AsyncResults对象的示例,因为我仍然不太了解您的程序应该如何工作.但是,既然您可以自由传递队列,则可以构建字典来根据需要存储队列/进程映射.

I didn't include an example of storing the queues or AsyncResults objects in dictionaries, because I still don't quite understand how your program is supposed to work. But now that you can pass your queues freely, you can build your dictionary to store the queue/process mappings as needed.

实际上,如果您确实在多个工作人员之间建立了一条管道,那么您甚至不需要在main中保留对工作人员间"队列的引用.创建队列,将其传递给您的工作人员,然后仅保留对main将使用的队列的引用.如果您确实有任意数量"的队列,我绝对建议尝试尽快让旧队列被垃圾回收.

In fact, if you really do build a pipeline between multiple workers, you don't even need to keep a reference to the "inter-worker" queues in main. Create the queues, pass them to your workers, then only retain references to queues that main will use. I would definitely recommend trying to let old queues be garbage collected as quickly as possible if you really do have "an arbitrary number" of queues.

这篇关于在Python中的进程之间共享许多队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆