在多处理中使用锁的问题池:酸洗错误 [英] Trouble using a lock with multiprocessing.Pool: pickling error

查看:63
本文介绍了在多处理中使用锁的问题池:酸洗错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在构建一个python模块,以从大量文本集中提取标签,尽管其结果是高质量的,但执行速度非常慢.我正在尝试通过使用多处理来加快过程,并且这也一直有效,直到尝试引入锁,以便一次只有一个过程连接到我们的数据库为止.我一生都无法弄清楚如何完成这项工作-尽管进行了很多搜索和调整,但我仍然得到PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed的信息.这是令人讨厌的代码-在我尝试将锁对象作为f的参数传递之前,它工作正常.

I'm building a python module to extract tags from a large corpus of text, and while its results are high quality it executes very slowly. I'm trying to speed the process up by using multiprocessing, and that was working too, until I tried to introduce a lock so that only one process was connecting to our database at a time. I can't figure out for the life of me how to make this work - despite much searching and tweaking I am still getting a PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed. Here's the offending code - it worked fine until I tried to pass a lock object as an argument for f.

def make_network(initial_tag, max_tags = 2, max_iter = 3):
    manager = Manager()
    lock = manager.Lock()
    pool = manager.Pool(8)

    # this is a very expensive function that I would like to parallelize 
    # over a list of tags. It involves a (relatively cheap) call to an external
    # database, which needs a lock to avoid simultaneous queries. It takes a list
    # of strings (tags) as its sole argument, and returns a list of sets with entries
    # corresponding to the input list.
    f = partial(get_more_tags, max_tags = max_tags, lock = lock) 

    def _recursively_find_more_tags(tags, level):
        if level >= max_iter:
            raise StopIteration
        new_tags = pool.map(f, tags)
        to_search = []
        for i, s in zip(tags, new_tags):
            for t in s:
                joined = ' '.join(t)
                print i + "|" + joined
                to_search.append(joined)
        try:
            return _recursively_find_more_tags(to_search, level+1)
        except StopIteration:
            return None

    _recursively_find_more_tags([initial_tag], 0)

推荐答案

您的问题是锁对象不可腌制.在这种情况下,我可以为您找到两种可能的解决方案.

Your problem is that lock objects are not picklable. I can see two possible solutions for you in that case.

  • 为避免这种情况,可以将锁变量设为全局变量.然后,您将能够在池处理函数中直接将其作为全局变量进行引用,而不必将其作为参数传递给池处理函数.之所以可行,是因为Python在创建池进程时使用OS fork机制,因此会将创建池进程的进程的全部内容复制到其中.这是将锁传递给使用多处理程序包创建的Python进程的唯一方法.顺便说一句,不必为此锁使用Manager类.进行此更改后,您的代码将如下所示:

  • To avoid this, you can make your lock variable a global variable. Then you will be able to reference it within your pool process function directly as a global variable, and will not have to pass it as an argument to the pool process function. This works because Python uses the OS fork mechanism when creating the pool processes and hence copies the entire contents of the process that creates the pool processes to them. This is the only way of passing a lock to a Python process created with the multiprocessing package. Incidentally, it is not necessary to use the Manager class just for this lock. With this change your code would look like this:

import multiprocessing
from functools import partial

lock = None  # Global definition of lock
pool = None  # Global definition of pool


def make_network(initial_tag, max_tags=2, max_iter=3):
    global lock
    global pool
    lock = multiprocessing.Lock()
    pool = multiprocessing.Pool(8)


def get_more_tags():
    global lock
    pass


# this is a very expensive function that I would like to parallelize
# over a list of tags. It involves a (relatively cheap) call to an external
# database, which needs a lock to avoid simultaneous queries. It takes a
# list of strings (tags) as its sole argument, and returns a list of sets
# with entries corresponding to the input list.
f = partial(get_more_tags, max_tags=max_tags) 

def _recursively_find_more_tags(tags, level):
    global pool
    if level >= max_iter:
        raise StopIteration
    new_tags = pool.map(f, tags)
    to_search = []
    for i, s in zip(tags, new_tags):
        for t in s:
            joined = ' '.join(t)
            print(i + "|" + joined)
            to_search.append(joined)
    try:
        return _recursively_find_more_tags(to_search, level + 1)
    except StopIteration:
        return None

_recursively_find_more_tags([initial_tag], 0)

在您的真实代码中,锁和池变量可能是类实例变量.

In your real code, it is possible that the lock and pool variables might be class instance variables.

  • 第二种完全避免使用锁但可能会有更高开销的解决方案是使用multiprocessing.Process创建另一个进程,并通过multiprocessing.Queue将其连接到您的每个池进程.此过程将负责运行数据库查询.您将使用队列来允许您的池进程将参数发送到管理数据库查询的进程.由于所有池进程将使用相同的队列,因此对数据库的访问将自动进行序列化.额外的开销来自数据库查询参数的酸洗/酸洗和查询响应.请注意,您可以将multiprocessing.Queue对象作为参数传递给池进程.还要注意,基于multiprocessing.Lock的解决方案不适用于Windows,在这种情况下,进程不是使用fork语义创建的.
  • A second solution which avoids the use of locks altogether but which might have slightly higher overhead would be to create another process with multiprocessing.Process and connect it via a multiprocessing.Queue to each of your pool processes. This process would be responsible for running your database query. You would use the queue to allow your pool processes to send parameters to the process that managed the database query. Since all the pool processes would use the same queue, access to the database would automatically be serialized. The additional overheads would come from the pickling/unpickling of the database query arguments and the query response. Note that you can pass a multiprocessing.Queue object to a pool process as an argument. Note also that the multiprocessing.Lock based solution would not work on Windows where process are not created with fork semantics.

这篇关于在多处理中使用锁的问题池:酸洗错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆