Python 3:在多处理过程中捕获警告 [英] Python 3: Catching warnings during multiprocessing

查看:58
本文介绍了Python 3:在多处理过程中捕获警告的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

warnings.catch_warnings() 上下文管理器是不是线程安全.如何在并行处理环境中使用它?

The warnings.catch_warnings() context manager is not thread safe. How do I use it in a parallel processing environment?

以下代码使用Python的 multiprocessing 模块.它需要一个(不可变的)小部件列表,并将它们分区(请参阅multiprocessing使用操作系统级管道和工人完成后进行酸洗,将决赛入围的小部件发送回主流程.

The code below solves a maximization problem using parallel processing with Python's multiprocessing module. It takes a list of (immutable) widgets, partitions them up (see Efficient multiprocessing of massive, brute force maximization in Python 3), finds the maxima ("finalists") of all the partitions, and then finds the maximum ("champion") of those "finalists." If I understand my own code correctly (and I wouldn't be here if I did), I'm sharing memory with all the child processes to give them the input widgets, and multiprocessing uses an operating-system-level pipe and pickling to send the finalist widgets back to the main process when the workers are done.

我想捕捉由于小部件从进程间管道中脱出而发生的取消腌制后小部件重新实例化而引起的多余小部件警告.实例化窗口小部件对象时,它们将验证自己的数据,并从Python标准warnings模块发出警告,以告知应用程序用户该窗口小部件怀疑用户的输入数据存在问题.因为取消拾取会导致对象实例化,所以我对代码的理解意味着,当且仅当它退出管道后才是决赛选手时,每个小部件对象才被精确地实例化一次-请参阅下一节以了解为什么这是不正确的

I want to catch the redundant widget warnings being caused by widgets' re-instantiation after the unpickling that happens when the widgets come out of the inter-process pipe. When widget objects instantiate, they validate their own data, emitting warnings from the Python standard warnings module to tell the app's user that the widget suspects there is a problem with the user's input data. Because unpickling causes objects to instantiate, my understanding of the code implies that each widget object is reinstantiated exactly once if and only if it is a finalist after it comes out of the pipe -- see the next section to see why this isn't correct.

窗口小部件在被修饰之前就已经创建了,因此用户已经很痛苦地意识到自己输入了错误的内容,并且不想再次听到.这些是我想通过warnings模块的catch_warnings()上下文管理器(即with语句)捕获的警告.

The widgets were already created before being frobnicated, so the user is already painfully aware of what input he got wrong and doesn't want to hear about it again. These are the warnings I'd like to catch with the warnings module's catch_warnings() context manager (i.e., a with statement).

在我的测试中,当多余的警告发出到以下标记为 A行 B行之间的任意位置时,我的范围有所缩小.令我惊讶的是,警告在output_queue.get()附近以外的其他地方发出.对我来说,这意味着multiprocessing使用酸洗将小部件发送给工人.

In my tests I've narrowed down when the superfluous warnings are being emitted to anywhere between what I've labeled below as Line A and Line B. What surprises me is that the warnings are being emitted in places other than just near output_queue.get(). This implies to me that multiprocessing sends the widgets to the workers using pickling.

结果是放置由 甚至是从 A行 B 行的所有内容,并且在此上下文中设置正确的警告过滤器都不会捕获警告.对我而言,这意味着警告是在工作进程中发出的.将此上下文管理器放在工作程序代码周围也不会捕获警告.

The upshot is that putting a context manager created by warnings.catch_warnings() even around everything from Line A to Line B and setting the right warnings filter inside this context does not catch the warnings. This implies to me that the warnings are being emitted in the worker processes. Putting this context manager around the worker code does not catch the warnings either.

此示例省略了代码,以决定问题的大小是否太小而无法打扰分叉过程,导入多处理过程以及定义my_frobnal_countermy_load_balancer.

This example omits the code for deciding if the problem size is too small to bother with forking processes, importing multiprocessing, and defining my_frobnal_counter, and my_load_balancer.

"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"

def frobnicate_parallel_worker(widgets, output_queue):
    resultant_widget = max(widgets, key=my_frobnal_counter)
    output_queue.put(resultant_widget)

def frobnicate_parallel(widgets):
    output_queue = multiprocessing.Queue()
    # partitions: Generator yielding tuples of sets
    partitions = my_load_balancer(widgets)
    processes = []
    # Line A: Possible start of where the warnings are coming from.
    for partition in partitions:
        p = multiprocessing.Process(
                 target=frobnicate_parallel_worker,
                 args=(partition, output_queue))
        processes.append(p)
        p.start()
    finalists = []
    for p in processes:
        finalists.append(output_queue.get())
    # Avoid deadlocks in Unix by draining queue before joining processes
    for p in processes:
        p.join()
    # Line B: Warnings no longer possible after here.
    return max(finalists, key=my_frobnal_counter)

推荐答案

解开不会导致__init__执行两次.我在Windows上运行了以下代码,但没有发生(每个__init__只运行一次).

The unpickling would not cause the __init__ to be executed twice. I ran the following code on Windows, and it doesn't happen (each __init__ is run precisely once).

因此,您需要向我们提供my_load_balancer和小部件类中的代码.此时,您的问题根本无法提供足够的信息.

Therefore, you need to provide us with the code from my_load_balancer and from widgets' class. At this point, your question simply doesn't provide enough information.

作为一个随机的猜测,您可以检查my_load_balancer是否复制了小部件的副本,从而使它们再次被实例化.

As a random guess, you might check whether my_load_balancer makes copies of widgets, causing them to be instantiated once again.

import multiprocessing
import collections

"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"

def my_load_balancer(widgets):
    partitions = tuple(set() for _ in range(8))
    for i, widget in enumerate(widgets):
        partitions[i % 8].add(widget)
    for partition in partitions:
        yield partition

def my_frobnal_counter(widget):
    return widget.id

def frobnicate_parallel_worker(widgets, output_queue):
    resultant_widget = max(widgets, key=my_frobnal_counter)
    output_queue.put(resultant_widget)

def frobnicate_parallel(widgets):
    output_queue = multiprocessing.Queue()
    # partitions: Generator yielding tuples of sets
    partitions = my_load_balancer(widgets)
    processes = []
    # Line A: Possible start of where the warnings are coming from.
    for partition in partitions:
        p = multiprocessing.Process(
                 target=frobnicate_parallel_worker,
                 args=(partition, output_queue))
        processes.append(p)
        p.start()
    finalists = []
    for p in processes:
        finalists.append(output_queue.get())
    # Avoid deadlocks in Unix by draining queue before joining processes
    for p in processes:
        p.join()
    # Line B: Warnings no longer possible after here.
    return max(finalists, key=my_frobnal_counter)

class Widget:
    id = 0
    def __init__(self):
        print('initializing Widget {}'.format(self.id))
        self.id = Widget.id
        Widget.id += 1

    def __str__(self):
        return str(self.id)

    def __repr__(self):
        return str(self)

def main():

    widgets = [Widget() for _ in range(16)]
    result = frobnicate_parallel(widgets)
    print(result.id)


if __name__ == '__main__':
    main()

这篇关于Python 3:在多处理过程中捕获警告的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆