如何使用线程并行压缩迭代器? [英] How to zip iterators in parallel using threading?

查看:155
本文介绍了如何使用线程并行压缩迭代器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

说我有 N 个生成器,它们生成项目gs = [..] # list of generators的流.

Say I have N generators that produce a stream of items gs = [..] # list of generators.

我可以轻松地将它们组合在一起,从gs:tuple_gen = zip(*gs)中的每个生成器中获取元组生成器.

I can easily zip them together to get a generator of tuples from each respective generator in gs: tuple_gen = zip(*gs).

这将依次在gs中的每个g上调用next(g),并将结果收集到一个元组中.但是,如果每个产品的生产成本很高,我们可能希望并行化next(g)在多个线程上的工作.

This calls next(g) on each g in sequence in gs and gathers the results in a tuple. But if each item is costly to produce we may want to parallelize the work of next(g) on multiple threads.

我该如何实现pzip(..)来做到这一点?

How can I implement a pzip(..) that does this?

推荐答案

可以通过创建一个生成器来实现您要求的东西,该生成器从ThreadPool上的apply_async调用中产生结果.

What you asked for can be achieved by creating a generator which yields the results from apply_async-calls on a ThreadPool.

仅供参考,我通过指定chunksize参数得到的pandas.read_csv-迭代器对这种方法进行了基准测试.我创建了一个大小为csv文件的1M行的八个副本,并指定了chunksize = 100_000.

FYI, I benchmarked this approach with pandas.read_csv-iterators you get with specifying the chunksize parameter. I created eight copies of a 1M rows sized csv-file and specified chunksize=100_000.

使用您提供的顺序方法读取了四个文件,使用下面的mt_gen函数使用四个线程池读取了四个文件:

Four of the files were read with the sequential method you provided, four with the mt_gen function below, using a pool of four threads:

  • 单螺纹〜3.68 s
  • 多线程〜1.21 s
  • single threaded ~ 3.68 s
  • multi-threaded ~ 1.21 s

但这并不意味着它将改善每种硬件和数据设置的结果.

Doesn't mean it will improve results for every hardware and data-setup, though.

import time
import threading
from multiprocessing.dummy import Pool  # dummy uses threads


def _load_sim(x = 10e6):
    for _ in range(int(x)):
        x -= 1
    time.sleep(1)


def gen(start, stop):
    for i in range(start, stop):
        _load_sim()
        print(f'{threading.current_thread().name} yielding {i}')
        yield i


def multi_threaded(gens):
    combi_g = mt_gen(gens)
    for item in combi_g:
        print(item)


def mt_gen(gens):
    with Pool(N_WORKERS) as pool:
        while True:
            async_results = [pool.apply_async(next, args=(g,)) for g in gens]
            try:
                results = [r.get() for r in async_results]
            except StopIteration:  # needed for Python 3.7+, PEP 479, bpo-32670
                return
            yield results


if __name__ == '__main__':

    N_GENS = 10
    N_WORKERS = 4
    GEN_LENGTH = 3

    gens = [gen(x * GEN_LENGTH, (x + 1) * GEN_LENGTH) for x in range(N_GENS)]
    multi_threaded(gens)

输出:

Thread-1 yielding 0
Thread-2 yielding 3
Thread-4 yielding 6
Thread-3 yielding 9
Thread-1 yielding 12
Thread-2 yielding 15
Thread-4 yielding 18
Thread-3 yielding 21
Thread-1 yielding 24
Thread-2 yielding 27
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
Thread-3 yielding 7
Thread-1 yielding 10
Thread-2 yielding 4
Thread-4 yielding 1
Thread-3 yielding 13
Thread-1 yielding 16
Thread-4 yielding 22
Thread-2 yielding 19
Thread-3 yielding 25
Thread-1 yielding 28
[1, 4, 7, 10, 13, 16, 19, 22, 25, 28]
Thread-1 yielding 8
Thread-4 yielding 2
Thread-3 yielding 11
Thread-2 yielding 5
Thread-1 yielding 14
Thread-4 yielding 17
Thread-3 yielding 20
Thread-2 yielding 23
Thread-1 yielding 26
Thread-4 yielding 29
[2, 5, 8, 11, 14, 17, 20, 23, 26, 29]

Process finished with exit code 0

这篇关于如何使用线程并行压缩迭代器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆