跨多个进程处理双端队列对象 [英] Working with deque object across multiple processes

查看:102
本文介绍了跨多个进程处理双端队列对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图减少读取大约100,000个条目的数据库的处理时间,但是我需要将它们格式化为一种特定的方式,为此,我尝试使用python的multiprocessing.map函数完全不同,除了我似乎无法获得任何形式的队列引用来在它们之间工作.

I'm trying to reduce the processing time of reading a database with roughly 100,000 entries, but I need them to be formatted a specific way, in an attempt to do this, I tried to use python's multiprocessing.map function which works perfectly except that I can't seem to get any form of queue reference to work across them.

我一直在使用来自队列的信息使用python 来指导我在多个进程中使用队列,以及使用带有线程的全局变量,以指导我在线程之间使用全局变量.我已经安装了该软件,但是在运行该过程后,当我检查列表/队列/字典/地图的长度时,它总是返回零

I've been using information from Filling a queue and managing multiprocessing in python to guide me for using queues across multiple processes, and Using a global variable with a thread to guide me for using global variables across threads. I've gotten the software to work, but when I check the list/queue/dict/map length after running the process, it always returns zero

我写了一个简单的例子来说明我的意思: 您必须将脚本作为文件运行,mapinitialize函数在解释器中不起作用.

I've written a simple example to show what I mean: You have to run the script as a file, the map's initialize function does not work from the interpreter.

from multiprocessing import Pool
from collections import deque

global_q = deque()

def my_init(q):
    global global_q
    global_q = q
    q.append("Hello world")


def map_fn(i):
    global global_q
    global_q.append(i)


if __name__ == "__main__":
    with Pool(3, my_init, (global_q,)) as pool:
        pool.map(map_fn, range(3))
    for p in range(len(global_q)):
        print(global_q.pop())

理论上,当我使用pool函数将队列对象引用从主线程传递到工作线程,然后使用给定函数初始化该线程的全局变量时,再从map之后,该对象引用仍应指向原始队列对象引用(长话短说,所有内容都应在同一队列中结束,因为它们都指向内存中的同一位置).

Theoretically, when I pass the queue object reference from the main thread to the worker threads using the pool function, and then initialize that thread's global variables using with the given function, then when I insert elements into the queue from the map function later, that object reference should still be pointing to the original queue object reference (long story short, everything should end up in the same queue, because they all point to the same location in memory).

所以,我希望:

Hello World
Hello World
Hello World
1
2
3

当然,1, 2, 3的顺序是任意的,但是在输出中您会看到''.

of course, the 1, 2, 3's are in arbitrary order, but what you'll see on the output is ''.

当我将对象引用传递给pool函数时,为什么什么也没发生?

How come when I pass object references to the pool function, nothing happens?

推荐答案

下面是一个示例,该示例如何通过扩展multiprocessing.managers.BaseManager类以支持deque s来在进程之间共享某些内容.

Here's an example of how to share something between processes by extending the multiprocessing.managers.BaseManager class to support deques.

文档中有关于创建定制管理器.

import collections
from multiprocessing import Pool
from multiprocessing.managers import BaseManager


class DequeManager(BaseManager):
    pass

class DequeProxy(object):
    def __init__(self, *args):
        self.deque = collections.deque(*args)
    def __len__(self):
        return self.deque.__len__()
    def appendleft(self, x):
        self.deque.appendleft(x)
    def append(self, x):
        self.deque.append(x)
    def pop(self):
        return self.deque.pop()
    def popleft(self):
        return self.deque.popleft()

# Currently only exposes a subset of deque's methods.
DequeManager.register('DequeProxy', DequeProxy,
                      exposed=['__len__', 'append', 'appendleft',
                               'pop', 'popleft'])


process_shared_deque = None  # Global only within each process.


def my_init(q):
    global process_shared_deque  # Initialize module-level global.
    process_shared_deque = q
    q.append("Hello world")

def map_fn(i):
    process_shared_deque.append(i)  # deque's don't have a "put()" method.


if __name__ == "__main__":
    manager = DequeManager()
    manager.start()
    shared_deque = manager.DequeProxy()

    with Pool(3, my_init, (shared_deque,)) as pool:
        pool.map(map_fn, range(3))

    for p in range(len(shared_deque)):  # Show left-to-right contents.
        print(shared_deque.popleft())

输出:

Hello world
0
1
2
Hello world
Hello world

这篇关于跨多个进程处理双端队列对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆