泳池工人没有完成所有任务 [英] Pool workers do not complete all tasks

查看:61
本文介绍了泳池工人没有完成所有任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个相对简单的python多处理脚本,该脚本设置了一个工作池,这些工作池通过自定义管理器将输出追加到熊猫dataframe.我发现的是,当我在池上调用close()/join()时,apply_async提交的所有任务并未全部完成.

I have a relatively simple python multiprocessing script that sets up a pool of workers that append output to a pandas dataframe by way of a custom manager. What I am finding is when I call close()/join() on the pool, not all the tasks submitted by apply_async are being completed.

这是一个简化的示例,该示例提交了1000个作业,但只有一半完成,从而导致断言错误.我是否忽略了一些非常简单的东西,或者这可能是一个错误?

Here's a simplified example that submits 1000 jobs but only half complete causing an assertion error. Have I overlooked something very simple or is this perhaps a bug?

from pandas import DataFrame
from multiprocessing.managers import BaseManager, Pool

class DataFrameResults:
    def __init__(self):
        self.results = DataFrame(columns=("A", "B")) 

    def get_count(self):
        return self.results["A"].count()

    def register_result(self, a, b):
        self.results = self.results.append([{"A": a, "B": b}], ignore_index=True)

class MyManager(BaseManager): pass

MyManager.register('DataFrameResults', DataFrameResults)

def f1(results, a, b):
    results.register_result(a, b)

def main():
    manager = MyManager()
    manager.start()
    results = manager.DataFrameResults()

    pool = Pool(processes=4)

    for (i) in range(0, 1000):
        pool.apply_async(f1, [results, i, i*i])
    pool.close()
    pool.join()

    print results.get_count()
    assert results.get_count() == 1000

if __name__ == "__main__":
    main()

推荐答案

您看到的问题是由于以下代码引起的:

The issue which you're seeing is because of this code:

self.results = self.results.append(...)

这不是原子的.因此,在某些情况下,线程将在读取self.results之后(或在追加时)被中断,但是在将新帧分配给self.results->之前,该实例将丢失.

this isn't atomic. So in some cases, the thread will be interrupted after reading self.results (or while appending) but before it can assign the new frame to self.results -> this instance will be lost.

正确的解决方案是等待使用结果对象获取结果,然后将所有结果附加到主线程中.

The correct solution is to wait use the results objects to get the results and then append all of them in the main thread.

这篇关于泳池工人没有完成所有任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆