处理多个进程中的单个文件 [英] Processing single file from multiple processes
问题描述
我有一个大文本文件,我想在其中处理每一行(执行一些操作)并将它们存储在数据库中.由于单个简单程序花费的时间太长,我希望它可以通过多个进程或线程来完成. 每个线程/进程应从单个文件中读取不同的数据(不同的行),并对它们的数据(行)进行一些操作,然后将它们放入数据库中,以便最后,我处理完所有数据,数据库随我需要的数据一起转储了.
I have a single big text file in which I want to process each line ( do some operations ) and store them in a database. Since a single simple program is taking too long, I want it to be done via multiple processes or threads. Each thread/process should read the DIFFERENT data(different lines) from that single file and do some operations on their piece of data(lines) and put them in the database so that in the end, I have whole of the data processed and my database is dumped with the data I need.
但是我无法弄清楚如何解决这个问题.
But I am not able to figure it out that how to approach this.
推荐答案
您正在寻找的是生产者/消费者模式
What you are looking for is a Producer/Consumer pattern
基本线程示例
这是使用线程模块(而不是多处理)的基本示例 >
Here is a basic example using the threading module (instead of multiprocessing)
import threading
import Queue
import sys
def do_work(in_queue, out_queue):
while True:
item = in_queue.get()
# process
result = item
out_queue.put(result)
in_queue.task_done()
if __name__ == "__main__":
work = Queue.Queue()
results = Queue.Queue()
total = 20
# start for workers
for i in xrange(4):
t = threading.Thread(target=do_work, args=(work, results))
t.daemon = True
t.start()
# produce data
for i in xrange(total):
work.put(i)
work.join()
# get the results
for i in xrange(total):
print results.get()
sys.exit()
您不会与线程共享文件对象.您可以通过为队列提供数据行来为他们工作.然后,每个线程将接起一行,对其进行处理,然后将其返回到队列中.
You wouldn't share the file object with the threads. You would produce work for them by supplying the queue with lines of data. Then each thread would pick up a line, process it, and then return it in the queue.
多处理模块中内置了一些更高级的功能,以共享数据,例如列表和特殊类型的队列.在使用多处理与线程时需要权衡取舍,这取决于您的工作是CPU约束还是IO约束.
There are some more advanced facilities built into the multiprocessing module to share data, like lists and special kind of Queue. There are trade-offs to using multiprocessing vs threads and it depends on whether your work is cpu bound or IO bound.
基本的多处理池示例
这是一个多处理池的基本示例
Here is a really basic example of a multiprocessing Pool
from multiprocessing import Pool
def process_line(line):
return "FOO: %s" % line
if __name__ == "__main__":
pool = Pool(4)
with open('file.txt') as source_file:
# chunk the work into batches of 4 lines at a time
results = pool.map(process_line, source_file, 4)
print results
池是管理自己的便捷对象流程.由于打开的文件可以遍历其行,因此可以将其传递到pool.map()
,该文件将循环遍历并将行传递给worker函数. 地图阻止并在其返回时返回整个结果完毕.请注意,这是一个过于简化的示例,并且pool.map()
将在分发工作之前一次将整个文件读入内存.如果您希望有大文件,请记住这一点.有更多高级方法可以设计生产者/消费者设置.
A Pool is a convenience object that manages its own processes. Since an open file can iterate over its lines, you can pass it to the pool.map()
, which will loop over it and deliver lines to the worker function. Map blocks and returns the entire result when its done. Be aware that this is an overly simplified example, and that the pool.map()
is going to read your entire file into memory all at once before dishing out work. If you expect to have large files, keep this in mind. There are more advanced ways to design a producer/consumer setup.
具有限制和行重新排序的手动池"
这是 Pool.map的手动示例,但是您可以设置队列大小,而不是一次性消耗整个可迭代对象,因此只能按它可以处理的最快速度逐个喂入它.我还添加了行号,以便以后可以跟踪它们并引用它们.
This is a manual example of the Pool.map, but instead of consuming an entire iterable in one go, you can set a queue size so that you are only feeding it piece by piece as fast as it can process. I also added the line numbers so that you can track them and refer to them if you want, later on.
from multiprocessing import Process, Manager
import time
import itertools
def do_work(in_queue, out_list):
while True:
item = in_queue.get()
line_no, line = item
# exit signal
if line == None:
return
# fake work
time.sleep(.5)
result = (line_no, line)
out_list.append(result)
if __name__ == "__main__":
num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
# start for workers
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work, results))
p.start()
pool.append(p)
# produce data
with open("source.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)
for p in pool:
p.join()
# get the results
# example: [(1, "foo"), (10, "bar"), (0, "start")]
print sorted(results)
这篇关于处理多个进程中的单个文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!