具有共享队列和结束条件的多处理 [英] Multiprocessing with shared queue and end criteria

查看:70
本文介绍了具有共享队列和结束条件的多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经有了想要切换到多进程的原始功能:

I've got this original function that I want to switch to multiprocess:

def optimal(t0, tf, frequences, delay, ratio = 0):

    First = True                            # First        
    for s in delay:
        delay = 0                           # delay between signals,
        timelines = list()

        for i in range(len(frequences)):
            timelines.append(time_builder(frequences[i], t0+delay, tf))
            delay += s

       trio_overlap = trio_combination(timelines, ratio)

        valid = True
        for items in trio_overlap.values():
            if len(list(set(items))) == len(items):
                continue
            else:
                valid = False

        if not valid:
            continue

        overlap = duo_combination(timelines)

    optimal = ... depending of conditions        
    return optimal

如果valid = True经过测试,它将计算一个名为optim_param的优化参数,并尝试将其最小化.如果它低于某个特定阈值optim_param < 0.3,我就会跳出循环并以该值为我的答案.

If valid = True after the test, it will compute an optimization parameter called optim_param and try to minimize it. If it gets under a certain threshold, optim_param < 0.3, I break out of the loop and take this value as my answer.

我的问题是,当我开发模型时,复杂度开始上升,单线程计算花费的时间太长.我想并行处理计算.由于每个过程都必须将使用s值获得的结果与当前最优值进行比较,因此我尝试实现一个Queue.

My problem is that as I develop my model, the complexity is starting to rise, and single thread computation takes too long. I would like to process the computation in parallel. Since each process will have to compare the result obtained with an s value to the current optimal, I tried to implement a Queue.

这是我第一次进行多处理,即使我认为自己的工作正确无误,我还是觉得我的代码是混乱且不完整的.我可以帮忙吗?

It's my first time doing multiprocessing, and even if I think I'm on the right track, I kinda feel like my code is messy and incomplete. Could I get some help?

谢谢:D

推荐答案

可以考虑使用Pool.imap_unordered,而不是为每种情况手动创建流程.诀窍是如何在获得可通过结果时彻底关闭:如果设置了一个检查每个周期的标志,则可以通过传递一个早退出的生成器来实现此目的.主程序从迭代器读取数据,保持看到的最佳结果,并在足够好时设置标志.最后的技巧是减慢从生成器读取(内部)线程的速度,以防止在获得良好结果后必须等待(或不干净地杀死)计划任务的大量积压.考虑到池中的进程数,可以使用信号量来实现步调.

Instead of manually creating a process for each case, consider using Pool.imap_unordered. The trick is how to cleanly shut down when a passable result is obtained: you can implement this by passing a generator that exits early in case a flag is set that it checks every cycle. The main program reads from the iterator, maintains the best result seen, and sets the flag when it is good enough. The final trick is to slow down the (internal) thread reading from the generator to prevent a large backlog of scheduled tasks that must be waited on (or, uncleanly, killed) after the good result is obtained. Given the number of processes in the pool, that pacing can be achieved with a semaphore.

下面是一个例子(进行简单分析)来演示:

Here's an example (with trivial analysis) to demonstrate:

import multiprocessing,threading,os

def interrupted(data,sem,interrupt):
  for x in data:
    yield x
    sem.acquire()
    if interrupt: break

def analyze(x): return x**2

np=os.cpu_count()
pool=multiprocessing.Pool(np)
sem=threading.Semaphore(np-1)
token=[]                        # mutable

vals=pool.imap_unordered(analyze,interrupted(range(-10,10),sem,token))
pool.close()                    # optional: to let processes exit faster

best=None
for res in vals:
  if best is None or res<best:
    best=res
    if best<5: token.append(None) # make it truthy
  sem.release()
pool.join()

print(best)

当然,还有其他与发生器共享信号量和中断标志的方法.这种方法使用丑陋的数据类型,但优点是不使用全局变量(甚至闭包).

There are of course other ways to share the semaphore and interrupt flag with the generator; this way uses an ugly data type but has the virtue of using no global variables (or even closures).

这篇关于具有共享队列和结束条件的多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆