子进程的生成器功能在父进程中运行 [英] Generator function of child processes runs in the Parent process

查看:62
本文介绍了子进程的生成器功能在父进程中运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试由子进程并行运行生成器进程.但是,当我尝试执行此操作时,我看到带有生成器的函数已由父进程处理!!!

I am trying to run a generator process in parallel by child processes. But when I tried to do this, I see the function with generator was processed by the parent process!!!

from multiprocessing import Process
import os import time

class p(Process):
    def __init__(self):
        Process.__init__(self)

    def run(self):
        print('PID:', os.getpid())

    def genfunc(self):
        time.sleep(1)
        yield os.getpid()

p1 = p()
p2 = p()

p1.start() 
p2.start() 
print('Iterators:')
print('Ran by:',next(p1.genfunc()))
print('Ran by:',next(p2.genfunc()))

输出:

PID: 20383

PID: 20384

Iterators:

Ran by:20382

Ran by:20382

我的目标是通过子进程运行生成器函数,然后通过将结果传递给父进程来实现.

My objective is to run the generator function by child processes, there by yielding results to the parent process.

在run()调用中让yield stmt失败,我尝试了上述方法.

With no sucess by having the yield stmt in the run() call, I tried the above way.

有人可以帮助我实现我的目标吗?

Could some one please help me to achieve my objective?

推荐答案

我希望谈到使用生成器进行处理时,您确实希望完成以下工作:

I expect that speaking about processing with generators you really want next things to be accomplished:

  1. 主进程通过某些生成器延迟生成一些任务,任务由某些数据( arg )表示.
  2. 这些任务可能由生成器非常缓慢地生成,例如通过从Internet上获取数据块,因此应在准备好处理它们时立即对其进行一次处理.
  3. 主进程将这些任务发送到多个子进程进行处理.
  4. 在儿童中进行处理也可能会花费缓慢且随机的时间.
  5. 孩子应该报告一些结果(成功处理的结果数据或编码错误,以防失败).
  6. 主流程也应该懒惰地收集所有结果,即,一旦准备好报告这些结果.
  7. 可以以与生成的严格顺序相同的顺序(严格顺序 True )或在处理后立即以任意顺序收集主过程中的结果(严格顺序 False ),第二个变体的速度可能会更快.
  8. 应该使用所有CPU内核来提高效率,每个内核一个进程.
  1. Main process generates some tasks lazily through some generator, tasks are represented by some data (arg).
  2. These tasks might be generated by generator very slowly, e.g. by fetching chunks of data from Internet, hence should be processed one by one as soon as they are ready to be processed.
  3. Main process sends these tasks to several child processes to be processed.
  4. Processing in children might also take slow and random amount of time.
  5. Children should report some results (sucessfully processed result data or encoded error in case of failure).
  6. Main process should gather all results also lazily, i.e. as soon as they are ready reports them.
  7. Results inside main process could be gathered in strictly same order as generated (strict order True) or in arbitrary order as soon as they are processed (strict order False), second variant may be considerably faster.
  8. All CPU cores should be used for efficiency, one process per core.

出于所有这些目的,我创建了示例模板代码,您可以将其用于特定问题:

For all these purposes I created example template code that you can use for your specific problem:

在线试用!

def ProcessTask(arg):
    import time, os
    print('Started task', arg[0], arg[1], 'by', os.getpid())
    time.sleep(arg[1])
    print('Finished task', arg[0], arg[1], 'by', os.getpid())
    return (arg[0], arg[1] * 2)

def Main():
    import multiprocessing as mp
    
    def GenTasks(n):
        import random, os, time
        for i in range(n):
            t = round(random.random() * 2., 3)
            print('Created task', i, t, 'by', os.getpid())
            yield (i, t)
            time.sleep(random.random())
            
    num_tasks = 4

    for strict_order in [True, False]:
        print('\nIs strict order', strict_order)
        with mp.Pool() as pool:
            for res in (pool.imap_unordered, pool.imap)[strict_order](
                ProcessTask, GenTasks(num_tasks)
            ):
                print('Result from task', res)
            
if __name__ == '__main__':
    Main()

输出:

Is strict order True
Created task 0 0.394 by 10536
Created task 1 0.357 by 10536
Started task 0 0.394 by 8740
Started task 1 0.357 by 5344
Finished task 1 0.357 by 5344
Finished task 0 0.394 by 8740
Result from task (0, 0.788)
Result from task (1, 0.714)
Created task 2 0.208 by 10536
Started task 2 0.208 by 5344
Finished task 2 0.208 by 5344
Result from task (2, 0.416)
Created task 3 0.937 by 10536
Started task 3 0.937 by 8740
Finished task 3 0.937 by 8740
Result from task (3, 1.874)

Is strict order False
Created task 0 1.078 by 10536
Started task 0 1.078 by 7256
Created task 1 0.029 by 10536
Started task 1 0.029 by 5440
Finished task 1 0.029 by 5440
Result from task (1, 0.058)
Finished task 0 1.078 by 7256
Result from task (0, 2.156)
Created task 2 1.742 by 10536
Started task 2 1.742 by 5440
Created task 3 0.158 by 10536
Started task 3 0.158 by 7256
Finished task 3 0.158 by 7256
Result from task (3, 0.316)
Finished task 2 1.742 by 5440
Result from task (2, 3.484)

PS :

  1. 在前面的代码中,当通常使用 multiprocessing 时,主进程和子进程都使用相同的单个模块脚本,而主进程和子进程都从执行整个脚本开始.如果__name__ =='__main __':块仅由主进程运行,则模块的其余代码均由main和child执行.
  2. 优良作法是将main所需执行的所有内容放入一个函数(在我的情况下为 Main()),而将子级放入另一个函数( ProcessTask()>以我为例),以及其他一些由main和child共享并运行的全局范围内的函数和变量(我在代码中没有任何共享).
  3. 处理功能(在我的代码中为 ProcessTask())应该在模块的全局范围内.
  4. 有关 multiprocessing 的其他文档在此处可用.
  1. In the previous code and when using multiprocessing in general same single module-script is used by both main and child processes, main and children all start by executing whole script. if __name__ == '__main__': block is run only by main process, the rest of module's code is executed both by main and children.
  2. Good practice is to put everything what is needed to execute by main into one function (Main() in my case) and by children into another function (ProcessTask() in my case), and some other functions and variable into global scope that is shared and run by both main and children (I don't have anything shared in code).
  3. Processing function (ProcessTask() in my code) should be in global scope of module.
  4. Other documentation regarding multiprocessing is available here.

这篇关于子进程的生成器功能在父进程中运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆