带有生成器的Python多处理 [英] Python multiprocessing with generator

查看:47
本文介绍了带有生成器的Python多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试处理文件(每一行都是json文档).文件的大小最多可以达到mbs的100 mb.所以我写了一个生成器代码,从文件中逐行获取每个文档.

I'm trying to process a file(every line is a json document). The size of the file can go up to 100's of mbs to gb's. So I wrote a generator code to fetch each document line by line from file.

def jl_file_iterator(file):
    with codecs.open(file, 'r', 'utf-8') as f:
        for line in f:
            document = json.loads(line)
            yield document

我的系统有4个核心,所以我想并行处理文件的4行.目前,我有这段代码,每次需要四行,并调用该代码进行并行处理

My system has 4 cores, So I would like to process 4 lines of the file in parallel. Currently I have this code which takes 4 lines at a time and calls the code for parallel processing

threads = 4
files, i = [], 1
for jl in jl_file_iterator(input_path):
    files.append(jl)
    if i % (threads) == 0:
        # pool.map(processFile, files)
        parallelProcess(files, o)
        files = []
    i += 1

if files:
    parallelProcess(files, o)
    files = []

这是我在其中进行实际处理的代码

This is my code where actual processing happens

def parallelProcess(files, outfile):
    processes = []
    for i in range(len(files)):
        p = Process(target=processFile, args=(files[i],))
        processes.append(p)
        p.start()
    for i in range(len(files)):
        processes[i].join()

def processFile(doc):
    extractors = {}
    ... do some processing on doc
    o.write(json.dumps(doc) + '\n')

您可以看到,我等待所有4行完成处理,然后再发送接下来的4个文件进行处理.但是我想做的是,一旦一个进程完成了处理文件,我想开始将下一行分配给已实现的处理器.我怎么做?

As you can see I wait for all the 4 lines to finish processing before I send the next 4 files to process. But what I would like to do is as soon as one process finish processing file I want to start the next line to be assigned to realeased processor. How do I do that?

PS:问题在于,因为它是生成器,所以我无法加载所有文件,也无法使用诸如map之类的东西来运行进程.

PS: The problem is since its an generator I cannot load all the files and use something like map to run the processes.

感谢您的帮助

推荐答案

正如@pvg在评论中所说,(有界)队列是在生产者和消费者之间以不同速度进行调解的自然方法,以确保他们都保持不变尽可能地忙,但不要让制片人领先.

As @pvg said in a comment, a (bounded) queue is the natural way to mediate among a producer and consumers with different speeds, ensuring they all stay as busy as possible but without letting the producer get way ahead.

这是一个独立的可执行示例.队列被限制为最大大小,该大小等于工作进程的数量.如果消费者的运行速度比生产者的运行速度快得多,那么让队列变得比生产者更大是很有意义的.

Here's a self-contained, executable example. The queue is restricted to a maximum size equal to the number of worker processes. If the consumers run much faster than the producer, it could make good sense to let the queue get bigger than that.

在您的特定情况下,将行传递给使用者并让他们并行执行document = json.loads(line)部分可能很有意义.

In your specific case, it would probably make sense to pass lines to the consumers and let them do the document = json.loads(line) part in parallel.

import multiprocessing as mp

NCORE = 4

def process(q, iolock):
    from time import sleep
    while True:
        stuff = q.get()
        if stuff is None:
            break
        with iolock:
            print("processing", stuff)
        sleep(stuff)

if __name__ == '__main__':
    q = mp.Queue(maxsize=NCORE)
    iolock = mp.Lock()
    pool = mp.Pool(NCORE, initializer=process, initargs=(q, iolock))
    for stuff in range(20):
        q.put(stuff)  # blocks until q below its max size
        with iolock:
            print("queued", stuff)
    for _ in range(NCORE):  # tell workers we're done
        q.put(None)
    pool.close()
    pool.join()

这篇关于带有生成器的Python多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆