给定N个生成器,是否可以创建一个在并行过程中运行它们并生成这些生成器的zip的生成器? [英] Given N generators, is it possible to create a generator that runs them in parallel processes and yields the zip of those generators?

查看:69
本文介绍了给定N个生成器,是否可以创建一个在并行过程中运行它们并生成这些生成器的zip的生成器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有N个生成器gen_1, ..., gen_N,其中每个生成器将产生相同数量的值.我希望生成器gen使其在N个并行进程中运行gen_1,...,gen_N并产生(next(gen_1), next(gen_2), ... next(gen_N))

Suppose I have N generators gen_1, ..., gen_N where each on them will yield the same number of values. I would like a generator gen such that it runs gen_1, ..., gen_N in N parallel processes and yields (next(gen_1), next(gen_2), ... next(gen_N))

那是我想要的:

def gen():
   yield (next(gen_1), next(gen_2), ... next(gen_N))

使得每个gen_i都在自己的进程上运行.是否有可能做到这一点?我在下面的虚拟示例中尝试执行此操作,但未成功:

in such a way that each gen_i is running on its own process. Is it possible to do this? I have tried doing this in the following dummy example with no success:

A = range(4)

def gen(a):
    B = ['a', 'b', 'c']
    for b in B:
        yield b + str(a)

def target(g):
    return next(g)

processes = [Process(target=target, args=(gen(a),)) for a in A]

for p in processes:
    p.start()

for p in processes:
    p.join()

但是我收到错误TypeError: cannot pickle 'generator' object.

我已经修改了@darkonaut答案,以适应我的需要.如果有人觉得有用,我会发布它.我们首先定义一些实用程序功能:

I have modified @darkonaut answer's a bit to fit my needs. I am posting it in case some of you find it useful. We first define a couple of utility functions:

from itertools import zip_longest
from typing import List, Generator


def grouper(iterable, n, fillvalue=iter([])):
    "Collect data into fixed-length chunks or blocks"
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)

def split_generators_into_batches(generators: List[Generator], n_splits):
    chunks = grouper(generators, len(generators) // n_splits + 1)

    return [zip_longest(*chunk) for chunk in chunks]

以下类负责将任意数量的生成器拆分为n个(进程数量)批处理,并对其进行处理以产生所需的结果:

The following class is responsible for splitting any number of generators into n (number of processes) batches and proccessing them yielding the desired result:

import multiprocessing as mp

class GeneratorParallelProcessor:
SENTINEL = 'S'

def __init__(self, generators, n_processes = 2 * mp.cpu_count()):
    self.n_processes = n_processes
    self.generators = split_generators_into_batches(list(generators), n_processes)
    self.queue = mp.SimpleQueue()
    self.barrier = mp.Barrier(n_processes + 1)
    self.sentinels = [self.SENTINEL] * n_processes

    self.processes = [
        mp.Process(target=self._worker, args=(self.barrier, self.queue, gen)) for gen in self.generators
    ]

def process(self):
    for p in self.processes:
        p.start()

    while True:
        results = list(itertools.chain(*(self.queue.get() for _ in self.generators)))
        if results != self.sentinels:
            yield results
            self.barrier.wait()
        else:
            break

    for p in self.processes:
        p.join()

def _worker(self, barrier, queue, generator):
    for x in generator:
        queue.put(x)
        barrier.wait()
    queue.put(self.SENTINEL)

要使用它,只需执行以下操作:

To use it just do the following:

parallel_processor = GeneratorParallelProcessor(generators)

    for grouped_generator in parallel_processor.process():
        output_handler(grouped_generator)

推荐答案

可以获取这样的"统一并行生成器(UPG)". (试图命名),但正如@jasonharper所述,您肯定需要在子进程中组装子生成器,因为不能对正在运行的生成器进行腌制.

It's possible to get such an "Unified Parallel Generator (UPG)" (attempt to coin a name) with some effort, but as @jasonharper already mentioned, you definitely need to assemble the sub-generators within the child-processes, since a running generator can't be pickled.

下面的模式是可重用的,只有生成器函数gen()对此示例是自定义的.该设计使用 multiprocessing.SimpleQueue 将生成器结果返回给父级和 multiprocessing.Barrier 进行同步.

The pattern below is re-usable with only the generator function gen() being custom to this example. The design uses multiprocessing.SimpleQueue for returning generator results to the parent and multiprocessing.Barrier for synchronization.

调用Barrier.wait()将阻塞调用者(任何进程中的线程),直到指定的parties的号码调用了.wait(),随后所有正在等待Barrier的线程同时释放.在这里使用Barrier可以确保仅在父级从迭代中收到 all 结果后 才开始计算进一步的生成器结果,可能希望保持总体检查内存消耗.

Calling Barrier.wait() will block the caller (thread in any process) until the number of specified parties has called .wait(), whereupon all threads currently waiting on the Barrier get released simultaneously. The usage of Barrier here ensures further generator-results are only started to be computed after the parent has received all results from an iteration, which might be desirable to keep overall memory consumption in check.

使用的并行工作程序数量等于您在gen_args_tuples -iterable中提供的参数元组的数量,因此gen_args_tuples=zip(range(4))例如将使用四个工作程序.有关更多详细信息,请参见代码中的注释.

The number of parallel workers used equals the number of argument-tuples you provide within the gen_args_tuples-iterable, so gen_args_tuples=zip(range(4)) will use four workers for example. See comments in code for further details.

import multiprocessing as mp

SENTINEL = 'SENTINEL'


def gen(a):
    """Your individual generator function."""
    lst = ['a', 'b', 'c']
    for ch in lst:
        for _ in range(int(10e6)):  # some dummy computation
            pass
        yield ch + str(a)


def _worker(i, barrier, queue, gen_func, gen_args):
    for x in gen_func(*gen_args):
        print(f"WORKER-{i} sending item.")
        queue.put((i, x))
        barrier.wait()
    queue.put(SENTINEL)


def parallel_gen(gen_func, gen_args_tuples):
    """Construct and yield from parallel generators
     build from `gen_func(gen_args)`.
     """
    gen_args_tuples = list(gen_args_tuples)  # ensure list
    n_gens = len(gen_args_tuples)
    sentinels = [SENTINEL] * n_gens
    queue = mp.SimpleQueue()
    barrier = mp.Barrier(n_gens + 1)  # `parties`: + 1 for parent

    processes = [
        mp.Process(target=_worker, args=(i, barrier, queue, gen_func, args))
        for i, args in enumerate(gen_args_tuples)
    ]

    for p in processes:
        p.start()

    while True:
        results = [queue.get() for _ in range(n_gens)]
        if results != sentinels:
            results.sort()
            yield tuple(r[1] for r in results)  # sort and drop ids
            barrier.wait()  # all workers are waiting
            # already, so this will unblock immediately
        else:
            break

    for p in processes:
        p.join()


if __name__ == '__main__':

    for res in parallel_gen(gen_func=gen, gen_args_tuples=zip(range(4))):
        print(res)

输出:

WORKER-1 sending item.
WORKER-0 sending item.
WORKER-3 sending item.
WORKER-2 sending item.
('a0', 'a1', 'a2', 'a3')
WORKER-1 sending item.
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-0 sending item.
('b0', 'b1', 'b2', 'b3')
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-1 sending item.
WORKER-0 sending item.
('c0', 'c1', 'c2', 'c3')

Process finished with exit code 0

这篇关于给定N个生成器,是否可以创建一个在并行过程中运行它们并生成这些生成器的zip的生成器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆