处理来自多个进程的单个文件 [英] Processing single file from multiple processes

查看:20
本文介绍了处理来自多个进程的单个文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个大文本文件,我想在其中处理每一行(执行一​​些操作)并将它们存储在数据库中.由于单个简单程序花费的时间太长,我希望它通过多个进程或线程来完成.每个线程/进程都应该从单个文件中读取不同的数据(不同的行)并对它们的数据(行)进行一些操作并将它们放入数据库中,这样最后我处理了整个数据并且我的数据库转储了我需要的数据.

I have a single big text file in which I want to process each line ( do some operations ) and store them in a database. Since a single simple program is taking too long, I want it to be done via multiple processes or threads. Each thread/process should read the DIFFERENT data(different lines) from that single file and do some operations on their piece of data(lines) and put them in the database so that in the end, I have whole of the data processed and my database is dumped with the data I need.

但我无法弄清楚如何解决这个问题.

But I am not able to figure it out that how to approach this.

推荐答案

您正在寻找的是生产者/消费者模式

What you are looking for is a Producer/Consumer pattern

基本线程示例

这是一个使用线程模块(而不是多处理)的基本示例>

Here is a basic example using the threading module (instead of multiprocessing)

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
    while True:
        item = in_queue.get()
        # process
        result = item
        out_queue.put(result)
        in_queue.task_done()

if __name__ == "__main__":
    work = Queue.Queue()
    results = Queue.Queue()
    total = 20

    # start for workers
    for i in xrange(4):
        t = threading.Thread(target=do_work, args=(work, results))
        t.daemon = True
        t.start()

    # produce data
    for i in xrange(total):
        work.put(i)

    work.join()

    # get the results
    for i in xrange(total):
        print results.get()

    sys.exit()

您不会与线程共享文件对象.您可以通过向 queue 提供数据行来为他们制作作品.然后每个线程会拿起一行,处理它,然后在队列中返回它.

You wouldn't share the file object with the threads. You would produce work for them by supplying the queue with lines of data. Then each thread would pick up a line, process it, and then return it in the queue.

多处理模块中内置了一些更高级的工具来共享数据,例如列表和特殊类型的队列.使用多处理与线程之间存在权衡,这取决于您的工作是受 CPU 限制还是受 IO 限制.

There are some more advanced facilities built into the multiprocessing module to share data, like lists and special kind of Queue. There are trade-offs to using multiprocessing vs threads and it depends on whether your work is cpu bound or IO bound.

基本的 multiprocessing.Pool 示例

这是一个非常基本的多处理池示例

Here is a really basic example of a multiprocessing Pool

from multiprocessing import Pool

def process_line(line):
    return "FOO: %s" % line

if __name__ == "__main__":
    pool = Pool(4)
    with open('file.txt') as source_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(process_line, source_file, 4)

    print results

Pool 是一个方便的对象,它管理自己的过程.由于打开的文件可以遍历其行,因此您可以将其传递给 pool.map(),后者将遍历它并将行传递给工作函数.Map 阻塞并返回整个结果完毕.请注意,这是一个过于简化的示例,并且 pool.map() 将在开始工作之前一次性将整个文件读入内存.如果您希望拥有大文件,请记住这一点.有更高级的方法来设计生产者/消费者设置.

A Pool is a convenience object that manages its own processes. Since an open file can iterate over its lines, you can pass it to the pool.map(), which will loop over it and deliver lines to the worker function. Map blocks and returns the entire result when its done. Be aware that this is an overly simplified example, and that the pool.map() is going to read your entire file into memory all at once before dishing out work. If you expect to have large files, keep this in mind. There are more advanced ways to design a producer/consumer setup.

带有限制和行重新排序的手动池"

这是 Pool.map 的手动示例,但不是一次性消耗整个可迭代对象,您可以设置队列大小,以便您只在处理时尽可能快地逐块提供.我还添加了行号,以便您可以跟踪它们并在以后需要时参考它们.

This is a manual example of the Pool.map, but instead of consuming an entire iterable in one go, you can set a queue size so that you are only feeding it piece by piece as fast as it can process. I also added the line numbers so that you can track them and refer to them if you want, later on.

from multiprocessing import Process, Manager
import time
import itertools 

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal 
        if line == None:
            return

        # fake work
        time.sleep(.5)
        result = (line_no, line)

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    with open("source.txt") as f:
        iters = itertools.chain(f, (None,)*num_workers)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)

    for p in pool:
        p.join()

    # get the results
    # example:  [(1, "foo"), (10, "bar"), (0, "start")]
    print sorted(results)

这篇关于处理来自多个进程的单个文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆