给定N个生成器,是否可以创建一个在并行过程中运行它们并生成这些生成器的zip的生成器? [英] Given N generators, is it possible to create a generator that runs them in parallel processes and yields the zip of those generators?
问题描述
假设我有N个生成器gen_1, ..., gen_N
,其中每个生成器将产生相同数量的值.我希望生成器gen
使其在N个并行进程中运行gen_1,...,gen_N并产生(next(gen_1), next(gen_2), ... next(gen_N))
Suppose I have N generators gen_1, ..., gen_N
where each on them will yield the same number of values. I would like a generator gen
such that it runs gen_1, ..., gen_N in N parallel processes and yields (next(gen_1), next(gen_2), ... next(gen_N))
那是我想要的:
def gen():
yield (next(gen_1), next(gen_2), ... next(gen_N))
使得每个gen_i都在自己的进程上运行.是否有可能做到这一点?我在下面的虚拟示例中尝试执行此操作,但未成功:
in such a way that each gen_i is running on its own process. Is it possible to do this? I have tried doing this in the following dummy example with no success:
A = range(4)
def gen(a):
B = ['a', 'b', 'c']
for b in B:
yield b + str(a)
def target(g):
return next(g)
processes = [Process(target=target, args=(gen(a),)) for a in A]
for p in processes:
p.start()
for p in processes:
p.join()
但是我收到错误TypeError: cannot pickle 'generator' object
.
我已经修改了@darkonaut答案,以适应我的需要.如果有人觉得有用,我会发布它.我们首先定义一些实用程序功能:
I have modified @darkonaut answer's a bit to fit my needs. I am posting it in case some of you find it useful. We first define a couple of utility functions:
from itertools import zip_longest
from typing import List, Generator
def grouper(iterable, n, fillvalue=iter([])):
"Collect data into fixed-length chunks or blocks"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def split_generators_into_batches(generators: List[Generator], n_splits):
chunks = grouper(generators, len(generators) // n_splits + 1)
return [zip_longest(*chunk) for chunk in chunks]
以下类负责将任意数量的生成器拆分为n个(进程数量)批处理,并对其进行处理以产生所需的结果:
The following class is responsible for splitting any number of generators into n (number of processes) batches and proccessing them yielding the desired result:
import multiprocessing as mp
class GeneratorParallelProcessor:
SENTINEL = 'S'
def __init__(self, generators, n_processes = 2 * mp.cpu_count()):
self.n_processes = n_processes
self.generators = split_generators_into_batches(list(generators), n_processes)
self.queue = mp.SimpleQueue()
self.barrier = mp.Barrier(n_processes + 1)
self.sentinels = [self.SENTINEL] * n_processes
self.processes = [
mp.Process(target=self._worker, args=(self.barrier, self.queue, gen)) for gen in self.generators
]
def process(self):
for p in self.processes:
p.start()
while True:
results = list(itertools.chain(*(self.queue.get() for _ in self.generators)))
if results != self.sentinels:
yield results
self.barrier.wait()
else:
break
for p in self.processes:
p.join()
def _worker(self, barrier, queue, generator):
for x in generator:
queue.put(x)
barrier.wait()
queue.put(self.SENTINEL)
要使用它,只需执行以下操作:
To use it just do the following:
parallel_processor = GeneratorParallelProcessor(generators)
for grouped_generator in parallel_processor.process():
output_handler(grouped_generator)
推荐答案
可以获取这样的"统一并行生成器(UPG)". (试图命名),但正如@jasonharper所述,您肯定需要在子进程中组装子生成器,因为不能对正在运行的生成器进行腌制.
It's possible to get such an "Unified Parallel Generator (UPG)" (attempt to coin a name) with some effort, but as @jasonharper already mentioned, you definitely need to assemble the sub-generators within the child-processes, since a running generator can't be pickled.
下面的模式是可重用的,只有生成器函数gen()
对此示例是自定义的.该设计使用 multiprocessing.SimpleQueue
将生成器结果返回给父级和 multiprocessing.Barrier
进行同步.
The pattern below is re-usable with only the generator function gen()
being custom to this example. The design uses multiprocessing.SimpleQueue
for returning generator results to the parent and multiprocessing.Barrier
for synchronization.
调用Barrier.wait()
将阻塞调用者(任何进程中的线程),直到指定的parties
的号码调用了.wait()
,随后所有正在等待Barrier
的线程同时释放.在这里使用Barrier
可以确保仅在父级从迭代中收到 all 结果后 才开始计算进一步的生成器结果,可能希望保持总体检查内存消耗.
Calling Barrier.wait()
will block the caller (thread in any process) until the number of specified parties
has called .wait()
, whereupon all threads currently waiting on the Barrier
get released simultaneously. The usage of Barrier
here ensures further generator-results are only started to be computed after the parent has received all results from an iteration, which might be desirable to keep overall memory consumption in check.
使用的并行工作程序数量等于您在gen_args_tuples
-iterable中提供的参数元组的数量,因此gen_args_tuples=zip(range(4))
例如将使用四个工作程序.有关更多详细信息,请参见代码中的注释.
The number of parallel workers used equals the number of argument-tuples you provide within the gen_args_tuples
-iterable, so gen_args_tuples=zip(range(4))
will use four workers for example. See comments in code for further details.
import multiprocessing as mp
SENTINEL = 'SENTINEL'
def gen(a):
"""Your individual generator function."""
lst = ['a', 'b', 'c']
for ch in lst:
for _ in range(int(10e6)): # some dummy computation
pass
yield ch + str(a)
def _worker(i, barrier, queue, gen_func, gen_args):
for x in gen_func(*gen_args):
print(f"WORKER-{i} sending item.")
queue.put((i, x))
barrier.wait()
queue.put(SENTINEL)
def parallel_gen(gen_func, gen_args_tuples):
"""Construct and yield from parallel generators
build from `gen_func(gen_args)`.
"""
gen_args_tuples = list(gen_args_tuples) # ensure list
n_gens = len(gen_args_tuples)
sentinels = [SENTINEL] * n_gens
queue = mp.SimpleQueue()
barrier = mp.Barrier(n_gens + 1) # `parties`: + 1 for parent
processes = [
mp.Process(target=_worker, args=(i, barrier, queue, gen_func, args))
for i, args in enumerate(gen_args_tuples)
]
for p in processes:
p.start()
while True:
results = [queue.get() for _ in range(n_gens)]
if results != sentinels:
results.sort()
yield tuple(r[1] for r in results) # sort and drop ids
barrier.wait() # all workers are waiting
# already, so this will unblock immediately
else:
break
for p in processes:
p.join()
if __name__ == '__main__':
for res in parallel_gen(gen_func=gen, gen_args_tuples=zip(range(4))):
print(res)
输出:
WORKER-1 sending item.
WORKER-0 sending item.
WORKER-3 sending item.
WORKER-2 sending item.
('a0', 'a1', 'a2', 'a3')
WORKER-1 sending item.
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-0 sending item.
('b0', 'b1', 'b2', 'b3')
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-1 sending item.
WORKER-0 sending item.
('c0', 'c1', 'c2', 'c3')
Process finished with exit code 0
这篇关于给定N个生成器,是否可以创建一个在并行过程中运行它们并生成这些生成器的zip的生成器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!