可以在Python进程之间共享set()吗? [英] Can a set() be shared between Python processes?

查看:100
本文介绍了可以在Python进程之间共享set()吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Python 2.7中使用多重处理来处理非常大的数据集.在每个进程运行时,它将整数添加到共享的mp.Manager.Queue()中,但前提是某些其他进程尚未添加相同的整数.由于您无法对Queue进行"in"式的成员资格测试,因此,我的操作方式是检查每个int共享的mp.Manager.list()中的成员资格.该列表最终将有大约3000万个条目,因此成员资格测试将非常缓慢,从而使多处理的优势无效.

I am using multiprocessing in Python 2.7 to process a very large set of data. As each process runs, it adds integers to a shared mp.Manager.Queue(), but only if some other process hasn't already added the same integer. Since you can't do an "in"-style membership test for Queues, the way I'm doing it is to check each int for membership in a shared mp.Manager.list(). The list will eventually have ~30 million entries, and so membership tests will be extremely slow, nullifying the advantage of multiprocessing.

这是我正在做的事情的简化版本:

Here's a much simplified version of what I'm doing:

import multiprocessing as mp

def worker(shared_list, out_q, lock):
    # Do some processing and get an integer
    result_int = some_other_code()

    # Use a lock to ensure nothing is added to the list in the meantime
    lock.acquire()
    # This lookup can take forever when the list is large
    if result_int not in shared_list:
        out_q.put(result_int)
        shared_list.append(result_int)
    lock.release()

manager = mp.Manager()
shared_list = manager.list()
lock = manager.lock()
out_q = manager.Queue()

for i in range(8):
   p = mp.Process(target=worker, args=(shared_list, out_q, lock))
   p.start()

我以前尝试使用set()而不是mp.Manager.list(),但是似乎每个进程都有自己的内存空间,因此,当我更新set时,它没有跨进程同步.因此,我切换到了当前的方法.

I previously tried using a set() instead of an mp.Manager.list(), but it seems that each process has its own memory space, and so when I updated the set, it didn't synchronize across processes. Hence, I switched to the current approach.

以下是我以前尝试使用set()的大致方法: 将多处理导入为mp

Here's roughly how I previously tried using a set(): import multiprocessing as mp

def worker(shared_set, out_q, lock):
    # Do some processing and get an integer
    result_int = some_other_code()

    # Use a lock to ensure nothing is added to the set in the meantime
    lock.acquire()
    # This lookup is fast, but the set doesn't reflect additions made by other processes.
    if result_int not in shared_set:
        out_q.put(result_int)
        shared_set.add(result_int)
    lock.release()

manager = mp.Manager()
lock = manager.lock()
out_q = manager.Queue()

# This set will NOT synchronize between processes
shared_set = set()


for i in range(8):
   p = mp.Process(target=worker, args=(shared_set, out_q, lock))
   p.start()

注意:这些示例未经测试,仅表示代码的相关部分.

Note: these examples are untested and simply represent the relevant parts of my code.

是否有一种方法可以跨进程共享集,或者以其他方式进行更快的成员资格查找?

Is there a way to share sets across processes, or otherwise do faster membership lookups?

更多信息:out_q由另一个进程使用,该进程将数据写入单个输出文件.不能重复.如果我生成一个整数并且发现它是重复的,则该过程需要返回并生成下一个最佳整数.

A little more information: the out_q is consumed by another process which writes the data to a single output file. There can be no duplicates. If I generate an integer and it's found to be a duplicate, the process needs to go back and generate the next-best integer.

推荐答案

一个明显的调整是使用mp.Manager.dict()代替集合,并使用任意值(例如,设置the_dict[result_int] = 1表示集合中的成员身份) .顺便说一句,这是在Python添加set类型之前每个人"如何实现集合的方法,甚至现在,字典和集合也由基本相同的代码实现.

An obvious tweak is to use an mp.Manager.dict() instead of the set, and use arbitrary values (say, set the_dict[result_int] = 1 to indicate membership in the set). BTW, this is how "everyone" implemented sets before Python added the set type, and even now dicts and sets are implemented by basically the same code under the covers.

稍后添加:我承认我不明白为什么您在原始代码中同时使用了集合和列表,因为集合的键与列表的内容相同.如果输入顺序不重要,为什么不完全忘记列表呢?然后,您还可以删除原始文件中所需的锁定层,以使集合和列表保持同步.

Added later: I confess I don't grasp why you used both a set and a list in the original code, since the set's keys are identical to the list's contents. If order of entry isn't important, why not forget the list entirely? Then you could also drop the layer of locking needed in the original to keep the set and the list in synch.

在dict的建议下充实了整个功能,就像这样:

Fleshing that out, with the dict suggestion, the whole function would become just like:

def worker(shared_dict):
    # Do some processing and get an integer
    result_int = some_other_code()
    shared_dict[result_int] = 1

其他进程可以先执行shared_dict.pop()然后一次获得一个值(尽管不,它们不能像等待队列.get()那样等待.pop()).

Other processes could do shared_dict.pop() then to get one value at a time (although, no, they couldn't wait on .pop() as they do for a queue's .get()).

还有一个:考虑使用本地(过程本地)集吗?它们将运行得更快.这样,每个工作人员就不会添加 知道的任何重复项,但可能会在 cross 进程中添加重复项.您的代码没有暗示out_q使用者的行为,但是如果只有一个,那么其中的本地集也可以清除跨进程重复项.还是内存负担过高呢?从这里无法猜到;-)

And one more: consider using local (process-local) sets? They'll run much faster. Then each worker won't add any duplicates it knows about, but there may be duplicates across processes. Your code didn't give any hints about what the out_q consumer does, but if there's only one then a local set in that too could weed out cross-process duplicates. Or perhaps the memory burden gets too high then? Can't guess from here ;-)

我将建议一种不同的方法:完全不使用mp.Manager.多数情况下,我看到人们使用它时,他们会后悔,因为它没有按照他们的想法进行.他们的想法:它正在提供物理上共享的对象.它在做什么:它正在提供 emem 共享对象.从物理上讲,它们生活在另一个",隐藏",过程"以及对对象的操作被转发到后一个过程,在该过程中,它们由该过程在其自己的地址空间中执行.根本不是物理上共享的.因此,尽管这可能非常方便,但即使是最简单的操作,也存在相当大的进程间开销.

I'm going to suggest a different approach: don't use mp.Manager at all. Most times I see people use it, they regret it, because it's not doing what they think it's doing. What they think: it's supplying physically shared objects. What it's doing: it's supplying semantically shared objects. Physically, they live in Yet Another, under-the-covers, process, and operations on the objects are forwarded to that latter process, where they're performed by that process in its own address space. It's not physically shared at all. So, while it can be very convenient, there are substantial interprocess overheads for even the simplest operations.

因此,我建议在一个过程中使用单个普通集合,这将是清除重复项的唯一代码.工作者进程产生整数,而无需担心重复-它们只是将整数传递了下来. mp.Queue就可以了(再次,mp.Manager.Queue并没有真正的需要).

So I suggest instead using a single, ordinary set in one process, which will be the sole code concerned with weeding out duplicates. The worker processes produce ints with no concern for duplicates - they just pass the ints on. An mp.Queue is fine for that (again, no real need for an mp.Manager.Queue).

就像这样,它是一个完整的可执行程序:

Like so, which is a complete executable program:

N = 20

def worker(outq):
    from random import randrange
    from time import sleep
    while True:    
        i = randrange(N)
        outq.put(i)
        sleep(0.1)

def uniqueifier(inq, outq):
    seen = set()
    while True:
        i = inq.get()
        if i not in seen:
            seen.add(i)
            outq.put(i)

def consumer(inq):
    for _ in range(N):
        i = inq.get()
        print(i)

if __name__ == "__main__":
    import multiprocessing as mp
    q1 = mp.Queue()
    q2 = mp.Queue()
    consume = mp.Process(target=consumer, args=(q2,))
    consume.start()
    procs = [mp.Process(target=uniqueifier, args=(q1, q2))]
    for _ in range(4):
        procs.append(mp.Process(target=worker, args=(q1,)))
    for p in procs:
        p.start()
    consume.join()
    for p in procs:
        p.terminate()

传递给uniqueifier的第二个队列起着原始队列的作用:它仅提供唯一的整数.不会尝试共享内存",因此不会为此付出任何代价.唯一的进程间通信是通过简单,显式的mp.Queue操作进行的.只有一组,并且由于没有以任何方式共享,因此它会尽快运行.

The second queue passed to uniqueifier plays the role of your original queue: it delivers only unique integers. No attempt is made to "share memory", and so no costs due to that are paid. The only interprocess communication is via easy, explicit mp.Queue operations. There is only one set, and since it's not shared in any way it runs as fast as possible.

实际上,尽管有多个输入,这只是建立了一个简单的管道.

In effect, this just sets up a simple pipeline, although with multiple inputs.

这篇关于可以在Python进程之间共享set()吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆