Python多处理和处理工作中的异常 [英] Python multiprocessing and handling exceptions in workers

查看:149
本文介绍了Python多处理和处理工作中的异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用python多处理库作为一个算法,我有很多工作者处理某些数据并将结果返回给父进程。我使用multiprocessing.Queue将作业传递给工作人员,其次是收集结果。



一切工作都很好,直到worker无法处理一些数据块。在下面的简化示例中,每个工作人员有两个阶段:




  • 初始化 - 可能会失败,在这种情况下,工作人员应该被销毁

  • 数据处理 - 处理一大堆数据可能会失败,在这种情况下,工作人员应该跳过此块并继续下一个数据。



当任何一个阶段都失败时,脚本完成后会遇到死锁。此代码模拟我的问题:

 导入多重处理为mp 
import random

workers_count = 5
#失败的可能性,更改模拟失败
fail_init_p = 0.2
fail_job_p = 0.3


#=========工作者=========
def do_work(job_state,arg):
如果random.random()<
raise异常(Job failed)
返回job%d processed%d%(job_state,arg)

def init(args):
如果random.random() fail_init_p:
raise异常(Worker init failed)
return args

def worker_function(args,jobs_queue,result_queue):
#INIT
#当init()失败时该怎么办?
try:
state = init(args)
除了:
print!Worker%d init fail%args
return
#DO WORK
#在作业队列中处理数据
用于iter中的作业(jobs_queue.get,None):
try:
#可以抛出异常!
result = do_work(state,job)
result_queue.put(result)
除了:
print!作业%d失败,跳过...%job
finally:
jobs_queue.task_done()
#告诉我们完成处理停止令牌
jobs_queue.task_done()



#========= Parent =========
jobs = mp.JoinableQueue()
results = mp.Queue()
for i in范围(workers_count):
mp.Process(target = worker_function,args =(i,jobs,results))。start()

#填充作业队列
results_to_expect = 0
for range(30):
jobs.put(j)
results_to_expect + = 1

#收集结果
#如果有些工作人员无法处理这项工作,我们比范围(result_to_expect)中的r
的结果低于预期

result = results.get()
打印结果

#信号所有工作人员为我在范围(workers_count)中完成

作业$($)

#为他们完成
jobs.join()

我有两个关于这段代码的问题:


  1. init()失败,如何检测该工作是无效的,而不是等待它完成?

  2. do_work()失败,如何通知父进程结果队列中预期的结果较少?

感谢您的帮助!

解决方案

 将多重处理作为mp 
导入随机

workers_count = 5
#失败的可能性,更改为模拟失败
fail_init_p = 0.5
fail_job_p = 0.4


#=========工人=========
def do_work(job_state,arg):
if random.random()<
raise异常(Job failed)
返回job%d processed%d%(job_state,arg)

def init(args):
如果random.random() fail_init_p:
raise异常(Worker init failed)
return args

def worker_function(args,jobs_queue,result_queue):
#INIT
#当init()失败时该怎么办?
try:
state = init(args)
除了:
print!Worker%d init fail%args
result_queue.put('init failed')
return
#DO WORK
#在作业队列中处理数据
用于iter中的作业(jobs_queue.get,None):
try:
#可以抛出异常!
result = do_work(state,job)
result_queue.put(result)
除了:
print!作业%d失败,跳过...%job
result_queue.put('job failed')


#=========家庭=========
jobs = mp。 Queue()
results = mp.Queue()
for i in range(workers_count):
mp.Process(target = worker_function,args =(i,jobs,results))。start ()

#填充作业队列
result_to_expect = 0
for range(30):
jobs.put(j)
results_to_expect + = 1

init_failures = 0
job_failures = 0
successes = 0
而job_failures +成功< 30和init_failures< worker_count:
result = results.get()
init_failures + = int(result =='init failed')
job_failures + = int(result =='job failed')
successes + = int(result!='init failed'and result!='job failed')
#print init_failures,job_failures,successes

for ii in range(workers_count):
jobs.put(无)

我的更改:


  1. 更改作业只是一个正常的队列(而不是

  2. 工作人员现在回传特殊结果字符串init failed和job failed。

  3. 只要具体条件生效,主进程将监视所述特殊结果。

  4. 最后,将停止请求(即工作),不管你有多少工人。请注意,并不是所有这些都可能被从队列中拉出来(以防工作人员失败)。

顺便说一句,原始代码很好,易于使用。随机概率位很酷。


I use python multiprocessing library for an algorithm in which I have many workers processing certain data and returning result to the parent process. I use multiprocessing.Queue for passing jobs to workers, and second to collect results.

It all works pretty well, until worker fails to process some chunk of data. In the simplified example below each worker has two phases:

  • initialization - can fail, in this case worker should be destroyed
  • data processing - processing a chunk of data can fail, in this case worker should skip this chunk and continue with next data.

When either of this phases fails I get a deadlock after script completion. This code simulates my problem:

import multiprocessing as mp
import random

workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.2
fail_job_p = 0.3


#========= Worker =========
def do_work(job_state, arg):
    if random.random() < fail_job_p:
        raise Exception("Job failed")
    return "job %d processed %d" % (job_state, arg)

def init(args):
    if random.random() < fail_init_p:
        raise Exception("Worker init failed")
    return args

def worker_function(args, jobs_queue, result_queue):
    # INIT
    # What to do when init() fails?
    try:
        state = init(args)
    except:
        print "!Worker %d init fail" % args
        return
    # DO WORK
    # Process data in the jobs queue
    for job in iter(jobs_queue.get, None):
        try:
            # Can throw an exception!
            result = do_work(state, job)
            result_queue.put(result)
        except:
            print "!Job %d failed, skip..." % job
        finally:
            jobs_queue.task_done()
    # Telling that we are done with processing stop token
    jobs_queue.task_done()



#========= Parent =========
jobs = mp.JoinableQueue()
results = mp.Queue()
for i in range(workers_count):
    mp.Process(target=worker_function, args=(i, jobs, results)).start()

# Populate jobs queue
results_to_expect = 0
for j in range(30):
    jobs.put(j)
    results_to_expect += 1

# Collecting the results
# What if some workers failed to process the job and we have
# less results than expected
for r in range(results_to_expect):
    result = results.get()
    print result

#Signal all workers to finish
for i in range(workers_count):
    jobs.put(None)

#Wait for them to finish
jobs.join()

I have two question about this code:

  1. When init() fails, how to detect that worker is invalid and not to wait for it to finish?
  2. When do_work() fails, how to notify parent process that less results should be expected in the results queue?

Thank you for help!

解决方案

I changed your code slightly to make it work (see explanation below).

import multiprocessing as mp
import random

workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.5
fail_job_p = 0.4


#========= Worker =========
def do_work(job_state, arg):
    if random.random() < fail_job_p:
        raise Exception("Job failed")
    return "job %d processed %d" % (job_state, arg)

def init(args):
    if random.random() < fail_init_p:
        raise Exception("Worker init failed")
    return args

def worker_function(args, jobs_queue, result_queue):
    # INIT
    # What to do when init() fails?
    try:
        state = init(args)
    except:
        print "!Worker %d init fail" % args
        result_queue.put('init failed')
        return
    # DO WORK
    # Process data in the jobs queue
    for job in iter(jobs_queue.get, None):
        try:
            # Can throw an exception!
            result = do_work(state, job)
            result_queue.put(result)
        except:
            print "!Job %d failed, skip..." % job
            result_queue.put('job failed')


#========= Parent =========
jobs = mp.Queue()
results = mp.Queue()
for i in range(workers_count):
    mp.Process(target=worker_function, args=(i, jobs, results)).start()

# Populate jobs queue
results_to_expect = 0
for j in range(30):
    jobs.put(j)
    results_to_expect += 1

init_failures = 0
job_failures = 0
successes = 0
while job_failures + successes < 30 and init_failures < workers_count:
    result = results.get()
    init_failures += int(result == 'init failed')
    job_failures += int(result == 'job failed')
    successes += int(result != 'init failed' and result != 'job failed')
    #print init_failures, job_failures, successes

for ii in range(workers_count):
    jobs.put(None)

My changes:

  1. Changed jobs to be just a normal Queue (instead of JoinableQueue).
  2. Workers now communicate back special results strings "init failed" and "job failed".
  3. The master process monitors for the said special results so long as specific conditions are in effect.
  4. In the end, put "stop" requests (i.e. None jobs) for however many workers you have, regardless. Note that not all of these may be pulled from the queue (in case the worker failed to initalize).

By the way, your original code was nice and easy to work with. The random probabilities bit is pretty cool.

这篇关于Python多处理和处理工作中的异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆