Python:多处理工作者的“之前"和“之后" [英] Python: 'before' and 'after' for multiprocessing workers

查看:106
本文介绍了Python:多处理工作者的“之前"和“之后"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

更新:这是一个更具体的示例

Update: Here is a more specific example

假设我要从一组可观的文件中编译一些统计数据: 我可以制作一个生成器(line for line in fileinput.input(files))和一些处理器:

Suppose I want to compile some statistical data from a sizable set of files: I can make a generator (line for line in fileinput.input(files)) and some processor:

from collections import defaultdict 
scores = defaultdict(int) 

def process(line): 
    if 'Result' in line: 
        res = line.split('\"')[1].split('-')[0]
        scores[res] += 1

问题是当到达multiprocessing.Pool时如何处理.

The question is how to handle this when one gets to the multiprocessing.Pool.

当然可以定义multiprocessing.sharedctypes以及自定义的struct而不是defaultdict,但这似乎很痛苦.另一方面,我想不出一种Python方式来在进程之前实例化某些东西,或者在生成器用尽主线程之后返回某些东西.

Of course it's possible to define a multiprocessing.sharedctypes as well as a custom struct instead of a defaultdict but this seems rather painful. On the other hand I can't think of a pythonic way to instantiate something before the process or to return something after a generator has run out to the main thread.

推荐答案

因此,您基本上可以创建直方图.这很容易并行化,因为直方图可以合并而不会很复杂.也许有人会说这个问题是微不足道的可并行化的,或者是令人尴尬的并行" .也就是说,您无需担心工作人员之间的沟通.

So you basically create a histogram. This is can easily be parallelized, because histograms can be merged without complication. One might want to say that this problem is trivially parallelizable or "embarrassingly parallel". That is, you do not need to worry about communication among workers.

只需将数据集分成多个块,让您的工作人员独立地进行工作,收集每个工作人员的直方图,然后合并直方图即可.

Just split your data set into multiple chunks, let your workers work on these chunks independently, collect the histogram of each worker, and then merge the histograms.

在实践中,最好让每个工作进程处理/读取其自己的文件来解决此问题.也就是说,任务"可以是文件名.您不应该开始腌制文件内容并通过管道在进程之间发送它们.让每个工作进程从文件中直接 检索批量数据.否则,您的架构会花费太多时间进行进程间通信,而不是做一些实际的工作.

In practice, this problem is best off by letting each worker process/read its own file. That is, a "task" could be a file name. You should not start pickling file contents and send them around between processes through pipes. Let each worker process retrieve the bulk data directly from files. Otherwise your architecture spends too much time with inter-process communication, instead of doing some real work.

您需要一个例子还是自己解决这个问题?

Do you need an example or can you figure this out yourself?

我有许多数据文件,其文件名采用以下格式:data0.txtdata1.txt,....

I have a number of data files with file names in this format: data0.txt, data1.txt, ... .

示例内容:

wolf
wolf
cat
blume
eisenbahn

目标是在数据文件中包含的单词上创建直方图.这是代码:

The goal is to create a histogram over the words contained in the data files. This is the code:

from multiprocessing import Pool
from collections import Counter
import glob


def build_histogram(filepath):
    """This function is run by a worker process.
    The `filepath` argument is communicated to the worker
    through a pipe. The return value of this function is
    communicated to the manager through a pipe.
    """
    hist = Counter()
    with open(filepath) as f:
        for line in f:
            hist[line.strip()] += 1
    return hist


def main():
    """This function runs in the manager (main) process."""

    # Collect paths to data files.
    datafile_paths = glob.glob("data*.txt")

    # Create a pool of worker processes and distribute work.
    # The input to worker processes (function argument) as well
    # as the output by worker processes is transmitted through
    # pipes, behind the scenes.
    pool = Pool(processes=3)
    histograms = pool.map(build_histogram, datafile_paths)

    # Properly shut down the pool of worker processes, and
    # wait until all of them have finished.
    pool.close()
    pool.join()

    # Merge sub-histograms. Do not create too many intermediate
    # objects: update the first sub-histogram with the others.
    # Relevant docs: collections.Counter.update
    merged_hist = histograms[0]
    for h in histograms[1:]:
        merged_hist.update(h)

    for word, count in merged_hist.items():
        print "%s: %s" % (word, count)


if __name__ == "__main__":
    main()

测试输出:

python countwords.py
eisenbahn: 12
auto: 6
cat: 1
katze: 10
stadt: 1
wolf: 3
zug: 4
blume: 5
herbert: 14
destruction: 4

这篇关于Python:多处理工作者的“之前"和“之后"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆