Python多处理:处理父母中的子错误 [英] Python Multiprocessing: Handling Child Errors in Parent

查看:101
本文介绍了Python多处理:处理父母中的子错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在玩多处理和队列。
我已经写了一段代码从mongoDB导出数据,将其映射成关系(平面)结构,将所有值转换为字符串并将其插入到mysql中。



每个步骤都作为一个进程提交,并提供导入/导出队列,对于在父进程中处理的mongoDB导出是安全的。



正如你将看到的以下,当队列读取None时,我使用队列和子进程自己终止。我目前遇到的问题是,如果一个子进程遇到一个未处理的异常,那么这不会被父进程识别,而其余的只是保持运行。我想要发生的是整个shebang退出,最多只能重述孩子的错误。



我有两个问题:


  1. 如何检测父母中的子错误?

  2. 如何在检测到错误(最佳做法)后杀死我的子进程?我意识到,把无放在队列上来杀死孩子是很脏的。

我使用的是python 2.7。 >

以下是我的代码的基本部分:

 #建立通信队列
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()

[...]

 #创建子进程
#all这里生成的进程是multiprocessing.Process的子类

#create mapper
mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q,mapper_result_q,columns,1000)
for i in range 10)]

#创建数据类型转换器,将所有内容转换为str
converters = [datatype_converter.DatatypeConverter(mapper_result_q,converter_result_q,'str',1000)
for i in range 10)]

#创建mysql作者
#我创建一个作者列表。目前只有一个,
#,但我有选择进一步并行化
writers = [mysql_inserter.MySqlWriter(mysql_host,mysql_user,mysql_passwd,mysql_schema,converter_result_q
,columns,'w _'+ mysql_table ,1000)for i in range(1)]

#开始mapper
在mapper中的mapper:
mapper.start()
time.sleep(1)

#转换器

转换器()

#起始作者
作者作者:
writer.start()

[...初始化mongo数据库连接...] / p>

 #将每个数据集读取队列为mapper 
for mongo_collection.find({inc_column:{$ gte:start}}):
mongo_input_result_q.put(row)
count + = 1
如果count%log_counter == 0:
print'Mongo Reader'+ + str(count)
打印MongoReader done

#从队列
#读取无对象时,进程被终止,现在阅读完成,为队列中的每个映射器放置无,从而终止自己
#对于所有后续进程,相同的
mapper中的mapper:
mongo_input_result_q.put(无)
映射器中的mapper:
mapper.join()
转换器中的转换器:
mapper_result_q.put(无)
转换器中的转换器:
converter.join()
作者作者:
converter_result_q.put (无)
作家作家
writer.join()


解决方案

我不知道标准做法,但是我发现,要有可靠的多处理器,我设计了方法/类/ etc。专门用于多处理。否则你永远都不知道另一边发生了什么(除非我错过了一些机制)。



具体我所做的是:




  • 子类 multiprocessing.Process 或者制作专门支持多处理的功能(你没有包装功能如果需要可以控制)

  • 从主进程向每个工作进程始终提供一个共享错误 multiprocessing.Queue

  • 将整个运行代码包含在 try:...中,除了Ex 之外。然后当发生意外事件时,会发送一个错误包:

    • 死机的进程ID

    • 它的原始上下文的异常(查看这里)。如果您想在主要过程中记录有用的信息,原始上下文是非常重要的。


  • 当然可以处理预期的问题,假设一个长时间运行的过程,工作人员的正常运行

  • (类似于你所说的),用循环$ b $包装运行的代码(try / catch-all) b

    • 在类或函数中定义一个停止令牌。

    • 当主进程要工作人员停止时,只需发送停止令牌。阻止所有人,发送足够的所有进程。

    • 循环检查输入q的令牌或任何其他输入您想要的




最终的结果是可以长时间存活的工作进程,可以让您知道出现问题时发生了什么。他们会静静地死亡,因为你可以处理任何你需要做的所有异常之后,你也会知道什么时候你需要重新启动一个工作。



再次,我我们通过反复试验来达到这个模式,所以我不知道它的标准是多少。这是否有助于您所要求的?


I am currently playing around with multiprocessing and queues. I have written a piece of code to export data from mongoDB, map it into a relational (flat) structure, convert all values to string and insert them into mysql.

Each of these steps is submitted as a process and given import/export queues, safe for the mongoDB export which is handled in the parent.

As you will see below, I use queues and child processes terminate themselves when they read "None" from the queue. The problem I currently have is that, if a child process runs into an unhandled Exception, this is not recognized by the parent and the rest just Keeps running. What I want to happen is that the whole shebang quits and at best reraise the child error.

I have two questions:

  1. How do I detect the child error in the parent?
  2. How do I kill my child processes after detecting the error (best practice)? I realize that putting "None" to the queue to kill the child is pretty dirty.

I am using python 2.7.

Here are the essential parts of my code:

# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()

[...]

    # create child processes
    # all processes generated here are subclasses of "multiprocessing.Process"

    # create mapper
    mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
               for i in range(10)]

    # create datatype converter, converts everything to str
    converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
                  for i in range(10)]

    # create mysql writer
    # I create a list of writers. currently only one, 
    # but I have the option to parallellize it further
    writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q
               , columns, 'w_'+mysql_table, 1000) for i in range(1)]

    # starting mapper
    for mapper in mappers:
        mapper.start()
    time.sleep(1)

    # starting converter
    for converter in converters:
        converter.start()

    # starting writer
    for writer in writers:
        writer.start()

[... initializing mongo db connection ...]

    # put each dataset read to queue for the mapper
    for row in mongo_collection.find({inc_column: {"$gte": start}}):
        mongo_input_result_q.put(row)
        count += 1
        if count % log_counter == 0:
            print 'Mongo Reader' + " " + str(count)
    print "MongoReader done"

    # Processes are terminated when they read "None" object from queue
    # now that reading is finished, put None for each mapper in the queue so they terminate themselves
    # the same for all followup processes
    for mapper in mappers:
        mongo_input_result_q.put(None)
    for mapper in mappers:
        mapper.join()
    for converter in converters:
        mapper_result_q.put(None)
    for converter in converters:
        converter.join()
    for writer in writers:
        converter_result_q.put(None)
    for writer in writers:
        writer.join()

解决方案

I don't know standard practice but what I've found is that to have reliable multiprocessing I design the methods/class/etc. specifically to work with multiprocessing. Otherwise you never really know what's going on on the other side (unless I've missed some mechanism for this).

Specifically what I do is:

  • Subclass multiprocessing.Process or make functions that specifically support multiprocessing (wrapping functions that you don't have control over if necessary)
  • always provide a shared error multiprocessing.Queue from the main process to each worker process
  • enclose the entire run code in a try: ... except Exception as e. Then when something unexpected happens send an error package with:
    • the process id that died
    • the exception with it's original context (check here). The original context is really important if you want to log useful information in the main process.
  • of course handle expected issues as normal within the normal operation of the worker
  • (similar to what you said already) assuming a long-running process, wrap the running code (inside the try/catch-all) with a loop
    • define a stop token in the class or for functions.
    • When the main process wants the worker(s) to stop, just send the stop token. to stop everyone, send enough for all the processes.
    • the wrapping loop checks the input q for the token or whatever other input you want

The end result is worker processes that can survive for a long time and that can let you know what's happening when something goes wrong. They will die quietly since you can handle whatever you need to do after the catch-all exception and you will also know when you need to restart a worker.

Again, I've just come to this pattern through trial and error so I don't know how standard it is. Does that help with what you are asking for?

这篇关于Python多处理:处理父母中的子错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆