发球的发电机上的多处理 [英] multiprocessing on tee'd generators
问题描述
请考虑以下脚本,在该脚本中,我测试对通过itertools.tee
获得的生成器进行一些计算的两种方式:
Consider the following script in which I test two ways of performing some calculations on generators obtained by itertools.tee
:
#!/usr/bin/env python3
from sys import argv
from itertools import tee
from multiprocessing import Process
def my_generator():
for i in range(5):
print(i)
yield i
def double(x):
return 2 * x
def compute_double_sum(iterable):
s = sum(map(double, iterable))
print(s)
def square(x):
return x * x
def compute_square_sum(iterable):
s = sum(map(square, iterable))
print(s)
g1, g2 = tee(my_generator(), 2)
try:
processing_type = argv[1]
except IndexError:
processing_type = "no_multi"
if processing_type == "multi":
p1 = Process(target=compute_double_sum, args=(g1,))
p2 = Process(target=compute_square_sum, args=(g2,))
print("p1 starts")
p1.start()
print("p2 starts")
p2.start()
p1.join()
print("p1 finished")
p2.join()
print("p2 finished")
else:
compute_double_sum(g1)
compute_square_sum(g2)
这是我在正常"模式下运行脚本时获得的信息:
Here is what I obtain when I run the script in "normal" mode:
$ ./test_tee.py
0
1
2
3
4
20
30
这里是并行模式:
$ ./test_tee.py multi
p1 starts
p2 starts
0
1
2
3
4
20
0
1
2
3
4
30
p1 finished
p2 finished
最初的生成器显然已被复制",并执行了两次.
The initial generator is apparently somehow "copied", and executed twice.
我想避免这种情况,因为在我的实际应用程序中,这似乎在我用来制作初始生成器的外部库之一中引起了一个错误(
I would like to avoid this because in my real application, this seems to induce a bug in one of the external libraries I'm using to make the initial generator (https://github.com/pysam-developers/pysam/issues/397), and still be able to do computations in parallel on the same generated values.
有没有办法实现我想要的?
Is there a way to achieve what I want ?
推荐答案
我在这里找到了另一种替代方法: https://stackoverflow.com/a/26873783/1878788 .
I found some alternative way of doing here: https://stackoverflow.com/a/26873783/1878788.
在这种方法中,我们不再准备生成器.我们只是复制其生成的项目,并将其提供给仅在一个过程中对生成的项目进行并行处理的复合函数,但是我们通过使用Pool
来利用多处理功能(这就是所谓的映射/归约方法?):
In this approach we don't tee the generator any more. We just duplicate its generated items and feed them to a composite function that does the parallel treatment on the generated items within one process only, but we take advantage of multiprocessing by using a Pool
(is this what is called a map/reduce approach?):
#!/usr/bin/env python3
from itertools import starmap
from multiprocessing import Pool
from functools import reduce
from operator import add
def my_generator():
for i in range(5):
print(i)
yield i
def double(x):
return 2 * x
def square(x):
return x * x
def double_and_square(args_list):
return (double(*args_list[0]), square(*args_list[1]))
def sum_tuples(tup1, tup2):
return tuple(starmap(add, zip(tup1, tup2)))
with Pool(processes=5) as pool:
results_generator = pool.imap_unordered(double_and_square, (((arg,), (arg,)) for arg in my_generator()))
print(reduce(sum_tuples, results_generator))
这适用于玩具示例.现在,我必须弄清楚如何在实际应用案例中类似地组织我的计算.
This works on the toy example. I now have to figure out how to similarly organize my computations in the real application case.
我尝试使用高阶函数(make_funcs_applier
)来概括此问题以生成复合函数(apply_funcs
),但是出现以下错误:
I tried to generalize this using a higher order function (make_funcs_applier
) to generate the composite function (apply_funcs
), but I get the following error:
AttributeError: Can't pickle local object 'make_funcs_applier.<locals>.apply_funcs'
更普遍的尝试
根据评论中的建议,我尝试改进上述解决方案以使其更可重用:
A more generalized attempt
Based on a suggestion in the comments, I tried to improve the above solution to be more re-usable:
#!/usr/bin/env python3
"""This script tries to work around some limitations of multiprocessing."""
from itertools import repeat, starmap
from multiprocessing import Pool
from functools import reduce
from operator import add
# Doesn't work because local functions can't be pickled:
# def make_tuple_func(funcs):
# def tuple_func(args_list):
# return tuple(func(args) for func, args in zip(funcs, args_list))
# return tuple_func
#
# test_tuple_func = make_tuple_func((plus_one, double, square))
class FuncApplier(object):
"""This kind of object can be used to group functions and call them on a
tuple of arguments."""
__slots__ = ("funcs", )
def __init__(self, funcs):
self.funcs = funcs
def __len__(self):
return len(self.funcs)
def __call__(self, args_list):
return tuple(func(args) for func, args in zip(self.funcs, args_list))
def fork_args(self, args_list):
"""Takes an arguments list and repeat them in a n-tuple."""
return tuple(repeat(args_list, len(self)))
def sum_tuples(*tuples):
"""Element-wise sum of tuple items."""
return tuple(starmap(add, zip(*tuples)))
# Can't define these functions in main:
# They wouldn't be pickleable.
def plus_one(x):
return x + 1
def double(x):
return 2 * x
def square(x):
return x * x
def main():
def my_generator():
for i in range(5):
print(i)
yield i
test_tuple_func = FuncApplier((plus_one, double, square))
with Pool(processes=5) as pool:
results_generator = pool.imap_unordered(
test_tuple_func,
(test_tuple_func.fork_args(args_list) for args_list in my_generator()))
print("sum of x+1:\t%s\nsum of 2*x:\t%s\nsum of x*x:\t%s" % reduce(
sum_tuples, results_generator))
exit(0)
if __name__ == "__main__":
exit(main())
测试:
$ ./test_fork.py
0
1
2
3
4
sum of x+1: 15
sum of 2*x: 20
sum of x*x: 30
对我来说,还有一些烦人的限制,因为我倾向于经常在代码中定义局部函数.
There are still some annoying limitations for me because I tend to often define local functions in my code.
这篇关于发球的发电机上的多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!