python->多处理模块 [英] python -> multiprocessing module

查看:76
本文介绍了python->多处理模块的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这就是我要完成的任务-

Here's what I am trying to accomplish -

  1. 我大约需要解析一百万个文件,将解析的内容附加到单个文件中.
  2. 由于单个进程要花一些时间,因此该选项不可用.
  3. 在Python中不使用线程,因为它本质上是运行单个进程(由于GIL).
  4. 因此使用多处理模块.即产生4个子流程以利用所有原始核心力量:)

到目前为止,到目前为止,我需要一个共享对象,所有子流程都可以访问该对象.我正在使用来自多处理模块的队列.同样,所有子流程都需要将其输出写入单个文件.我猜可能是一个使用Locks的地方.使用此设置,当我运行时,不会出现任何错误(因此父进程似乎正常),只是停顿了.当我按ctrl-C时,我看到一个回溯(每个子过程一个).同样,没有输出写入输出文件.这是代码(请注意,没有多进程,一切都可以正常运行)-

So far so good, now I need a shared object which all the sub-processes have access to. I am using Queues from the multiprocessing module. Also, all the sub-processes need to write their output to a single file. A potential place to use Locks I guess. With this setup when I run, I do not get any error (so the parent process seems fine), it just stalls. When I press ctrl-C I see a traceback (one for each sub-process). Also no output is written to the output file. Here's code (note that everything runs fine without multi-processes) -

import os
import glob
from multiprocessing import Process, Queue, Pool

data_file  = open('out.txt', 'w+')

def worker(task_queue):
    for file in iter(task_queue.get, 'STOP'):
        data = mine_imdb_page(os.path.join(DATA_DIR, file))
        if data:
            data_file.write(repr(data)+'\n')
    return

def main():
    task_queue = Queue()
    for file in glob.glob('*.csv'):
        task_queue.put(file)
    task_queue.put('STOP') # so that worker processes know when to stop

    # this is the block of code that needs correction.
    if multi_process:
        # One way to spawn 4 processes
        # pool = Pool(processes=4) #Start worker processes
        # res  = pool.apply_async(worker, [task_queue, data_file])

        # But I chose to do it like this for now.
        for i in range(4):
            proc = Process(target=worker, args=[task_queue])
            proc.start()
    else: # single process mode is working fine!
        worker(task_queue)
    data_file.close()
    return

我在做什么错?我还尝试在生成时将打开的file_object传递给每个进程.但是没有效果.例如-Process(target=worker, args=[task_queue, data_file]).但这并没有改变任何东西.我觉得子进程由于某种原因无法写入文件. file_object的实例没有被复制(在生成时)或其他一些古怪的东西……有人有主意吗?

what am I doing wrong? I also tried passing the open file_object to each of the processes at the time of spawning. But to no effect. e.g.- Process(target=worker, args=[task_queue, data_file]). But this did not change anything. I feel the subprocesses are not able to write to the file for some reason. Either the instance of the file_object is not getting replicated (at the time of spawn) or some other quirk... Anybody got an idea?

EXTRA:还有什么方法可以保持持久的mysql_connection打开&将其传递给sub_processes?因此,我在父进程&中打开了一个mysql连接我所有的子流程都应该可以访问打开的连接.基本上,这等效于python中的shared_memory.这里有什么想法吗?

EXTRA: Also Is there any way to keep a persistent mysql_connection open & pass it across to the sub_processes? So I open a mysql connection in my parent process & the open connection should be accessible to all my sub-processes. Basically this is the equivalent of a shared_memory in python. Any ideas here?

推荐答案

尽管与Eric的讨论富有成果,但后来我发现了一种更好的方法.在多处理模块中,有一个名为"Pool"的方法,非常适合我的需求.

Although the discussion with Eric was fruitful, later on I found a better way of doing this. Within the multiprocessing module there is a method called 'Pool' which is perfect for my needs.

它可以根据系统的内核数量进行自我优化.即仅产生与否一样多的进程.的核心.当然,这是可定制的.所以这是代码.以后可能会帮助某人-

It's optimizes itself to the number of cores my system has. i.e. only as many processes are spawned as the no. of cores. Of course this is customizable. So here's the code. Might help someone later-

from multiprocessing import Pool

def main():
    po = Pool()
    for file in glob.glob('*.csv'):
        filepath = os.path.join(DATA_DIR, file)
        po.apply_async(mine_page, (filepath,), callback=save_data)
    po.close()
    po.join()
    file_ptr.close()

def mine_page(filepath):
    #do whatever it is that you want to do in a separate process.
    return data

def save_data(data):
    #data is a object. Store it in a file, mysql or...
    return

仍然要经历这个巨大的模块.不确定save_data()是由父进程执行还是由子进程使用此函数.如果是孩子进行保存,则在某些情况下可能会导致并发问题.如果有人在使用此模块方面有更多的经验,那么您将在这里获得更多的知识...

Still going through this huge module. Not sure if save_data() is executed by parent process or this function is used by spawned child processes. If it's the child which does the saving it might lead to concurrency issues in some situations. If anyone has anymore experience in using this module, you appreciate more knowledge here...

这篇关于python->多处理模块的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆