Python 多处理:处理父级中的子级错误 [英] Python Multiprocessing: Handling Child Errors in Parent

查看:21
本文介绍了Python 多处理:处理父级中的子级错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在玩多处理和队列.我写了一段代码来从 mongoDB 导出数据,将其映射到关系(平面)结构,将所有值转换为字符串并将它们插入到 mysql 中.

I am currently playing around with multiprocessing and queues. I have written a piece of code to export data from mongoDB, map it into a relational (flat) structure, convert all values to string and insert them into mysql.

这些步骤中的每一个都作为一个进程提交并给定导入/导出队列,这对于在父级中处理的 mongoDB 导出是安全的.

Each of these steps is submitted as a process and given import/export queues, safe for the mongoDB export which is handled in the parent.

正如您将在下面看到的,我使用队列,子进程在从队列中读取无"时会自行终止.我目前遇到的问题是,如果子进程遇到未处理的异常,则父进程无法识别该异常,其余进程将继续运行.我想要发生的是整个shebang退出并充其量重新引发子错误.

As you will see below, I use queues and child processes terminate themselves when they read "None" from the queue. The problem I currently have is that, if a child process runs into an unhandled Exception, this is not recognized by the parent and the rest just Keeps running. What I want to happen is that the whole shebang quits and at best reraise the child error.

我有两个问题:

  1. 如何检测父级中的子级错误?
  2. 如何在检测到错误后终止我的子进程(最佳实践)?我意识到将无"放入队列以杀死孩子是非常肮脏的.

我使用的是 python 2.7.

I am using python 2.7.

以下是我的代码的基本部分:

Here are the essential parts of my code:

# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()

[...]

    # create child processes
    # all processes generated here are subclasses of "multiprocessing.Process"

    # create mapper
    mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
               for i in range(10)]

    # create datatype converter, converts everything to str
    converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
                  for i in range(10)]

    # create mysql writer
    # I create a list of writers. currently only one, 
    # but I have the option to parallellize it further
    writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q
               , columns, 'w_'+mysql_table, 1000) for i in range(1)]

    # starting mapper
    for mapper in mappers:
        mapper.start()
    time.sleep(1)

    # starting converter
    for converter in converters:
        converter.start()

    # starting writer
    for writer in writers:
        writer.start()

[...初始化mongo db连接...]

[... initializing mongo db connection ...]

    # put each dataset read to queue for the mapper
    for row in mongo_collection.find({inc_column: {"$gte": start}}):
        mongo_input_result_q.put(row)
        count += 1
        if count % log_counter == 0:
            print 'Mongo Reader' + " " + str(count)
    print "MongoReader done"

    # Processes are terminated when they read "None" object from queue
    # now that reading is finished, put None for each mapper in the queue so they terminate themselves
    # the same for all followup processes
    for mapper in mappers:
        mongo_input_result_q.put(None)
    for mapper in mappers:
        mapper.join()
    for converter in converters:
        mapper_result_q.put(None)
    for converter in converters:
        converter.join()
    for writer in writers:
        converter_result_q.put(None)
    for writer in writers:
        writer.join()

推荐答案

我不知道标准做法,但我发现要获得可靠的多处理,我设计了方法/类/等.专门用于多处理.否则你永远不会真正知道另一边发生了什么(除非我错过了一些机制).

I don't know standard practice but what I've found is that to have reliable multiprocessing I design the methods/class/etc. specifically to work with multiprocessing. Otherwise you never really know what's going on on the other side (unless I've missed some mechanism for this).

具体来说,我所做的是:

Specifically what I do is:

  • 子类 multiprocessing.Process 或制作专门支持多处理的函数(必要时包装您无法控制的函数)
  • 始终提供从主进程到每个工作进程的共享错误 multiprocessing.Queue
  • 将整个运行代码括在一个 try: ... except Exception as e 中.然后当发生意外时发送一个错误包:
    • 死亡的进程ID
    • 原始上下文的异常(在此处查看).如果您想在主进程中记录有用的信息,原始上下文非常重要.
    • Subclass multiprocessing.Process or make functions that specifically support multiprocessing (wrapping functions that you don't have control over if necessary)
    • always provide a shared error multiprocessing.Queue from the main process to each worker process
    • enclose the entire run code in a try: ... except Exception as e. Then when something unexpected happens send an error package with:
      • the process id that died
      • the exception with it's original context (check here). The original context is really important if you want to log useful information in the main process.
      • 在类或函数中定义一个停止标记.
      • 当主进程希望工作程序停止时,只需发送停止令牌.停止每个人,发送足够的所有进程.
      • 包装循环检查输入 q 中是否有令牌或您想要的任何其他输入

      最终结果是工作进程可以存活很长时间,并且可以在出现问题时让您知道发生了什么.它们会安静地死去,因为您可以在捕获所有异常之后处理您需要做的任何事情,并且您还将知道何时需要重新启动工作程序.

      The end result is worker processes that can survive for a long time and that can let you know what's happening when something goes wrong. They will die quietly since you can handle whatever you need to do after the catch-all exception and you will also know when you need to restart a worker.

      同样,我刚刚通过反复试验得出这种模式,所以我不知道它的标准如何.这对您的要求有帮助吗?

      Again, I've just come to this pattern through trial and error so I don't know how standard it is. Does that help with what you are asking for?

      这篇关于Python 多处理:处理父级中的子级错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆