multiprocessing.Pool:使用apply_async的回调选项时调用辅助函数 [英] multiprocessing.Pool: calling helper functions when using apply_async's callback option

查看:191
本文介绍了multiprocessing.Pool:使用apply_async的回调选项时调用辅助函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

apply_async的流程在调用迭代(?)函数和回调函数之间如何工作?

How does the flow of apply_async work between calling the iterable (?) function and the callback function?

设置:我正在读取2000文件目录中所有文件的某些行,其中有几百万行,有的只有几行.提取一些标头/格式/日期数据以表征每个文件.这是在16 CPU的计算机上完成的,因此可以对其进行多处理.

Setup: I am reading some lines of all the files inside a 2000 file directory, some with millions of lines, some with only a few. Some header/formatting/date data is extracted to charecterize each file. This is done on a 16 CPU machine, so it made sense to multiprocess it.

当前,预期结果正在发送到列表(ahlala),因此我可以将其打印出来;稍后,它将被写入* .csv.这是我代码的简化版本,最初基于非常有用的帖子.

Currently, the expected result is being sent to a list (ahlala) so I can print it out; later, this will be written to *.csv. This is a simplified version of my code, originally based off this extremely helpful post.

import multiprocessing as mp

def dirwalker(directory):
  ahlala = []

  # X() reads files and grabs lines, calls helper function to calculate
  # info, and returns stuff to the callback function
  def X(f): 
    fileinfo = Z(arr_of_lines) 
    return fileinfo 

  # Y() reads other types of files and does the same thing
  def Y(f): 
    fileinfo = Z(arr_of_lines)
    return fileinfo

  # results() is the callback function
  def results(r):
    ahlala.extend(r) # or .append, haven't yet decided

  # helper function
  def Z(arr):
    return fileinfo # to X() or Y()!

  for _,_,files in os.walk(directory):
    pool = mp.Pool(mp.cpu_count()
    for f in files:
      if (filetype(f) == filetypeX): 
        pool.apply_async(X, args=(f,), callback=results)
      elif (filetype(f) == filetypeY): 
        pool.apply_async(Y, args=(f,), callback=results)

  pool.close(); pool.join()
  return ahlala

请注意,如果我将所有辅助功能Z()都放入X()Y()results()中,则代码可以正常工作,但是这是否重复性或较慢?我知道每个函数调用都会调用回调函数,但是何时调用回调函数?是在pool.apply_async() ...完成了流程的所有作业之后吗?如果在第一个功能pool.apply_async()需要的范围(?)(在这种情况下为X())中调用这些辅助功能,不是更快吗?如果没有,我是否应该将辅助函数放在results()中?

Note, the code works if I put all of Z(), the helper function, into either X(), Y(), or results(), but is this either repetitive or possibly slower than possible? I know that the callback function is called for every function call, but when is the callback function called? Is it after pool.apply_async()...finishes all the jobs for the processes? Shouldn't it be faster if these helper functions were called within the scope (?) of the first function pool.apply_async() takes (in this case, X())? If not, should I just put the helper function in results()?

其他相关想法:守护进程是否为什么什么都不显示?我也很困惑如何排队,如果这是问题所在. 这似乎是一个起点学习,但是使用apply_async还是仅在明显的时间效率低下可以安全地忽略排队?

Other related ideas: Are daemon processes why nothing shows up? I am also very confused about how to queue things, and if this is the problem. This seems like a place to start learning it, but can queuing be safely ignored when using apply_async, or only at a noticable time inefficiency?

推荐答案

您在这里询问的是很多不同的东西,所以我会尽力介绍一下:

You're asking about a whole bunch of different things here, so I'll try to cover it all as best I can:

传递给callback的函数将在工作进程返回其结果后立即在主进程(而不是工作进程)中执行.它在Pool对象在内部创建的线程中执行.该线程使用result_queue中的对象,该对象用于从所有辅助进程中获取结果.线程将结果从队列中拉出后,它将执行callback.在执行回调时,无法从队列中提取其他任何结果,因此确保回调快速完成很重要.以您的示例为例,一旦您通过apply_async进行的对XY的调用之一完成,结果将由工作进程放入result_queue中,然后结果处理线程将将结果从result_queue中拉出,您的callback将被执行.

The function you pass to callback will be executed in the main process (not the worker) as soon as the worker process returns its result. It is executed in a thread that the Pool object creates internally. That thread consumes objects from a result_queue, which is used to get the results from all the worker processes. After the thread pulls the result off the queue, it executes the callback. While your callback is executing, no other results can be pulled from the queue, so its important that the callback finishes quickly. With your example, as soon as one of the calls to X or Y you make via apply_async completes, the result will be placed into the result_queue by the worker process, and then the result-handling thread will pull the result off of the result_queue, and your callback will be executed.

第二,我怀疑您的示例代码没有看到任何结果的原因是因为所有辅助函数调用均失败.如果辅助函数失败,则将永远不会执行callback.除非您尝试从apply_async返回的noreferrer> AsyncResult 对象.但是,由于您不保存任何这些对象,因此您永远不会知道发生了故障.如果您是我,我会在测试时尝试使用pool.apply,这样您就可以在发生错误时立即看到它们.

Second, I suspect the reason you're not seeing anything happen with your example code is because all of your worker function calls are failing. If a worker function fails, callback will never be executed. The failure won't be reported at all unless you try to fetch the result from the AsyncResult object returned by the call to apply_async. However, since you're not saving any of those objects, you'll never know the failures occurred. If I were you, I'd try using pool.apply while you're testing so that you see errors as soon as they occur.

工作程序可能失败的原因(至少在您提供的示例代码中)是因为XY被定义为另一个函数内部的函数. multiprocessing通过在主进程中对其进行酸洗,然后在工作人员流程中对其进行取消酸洗,将功能和对象传递给工作人员流程.在其他函数中定义的函数是不可腌制的,这意味着multiprocessing将无法在工作进程中成功释放它们.要解决此问题,请在模块的顶层定义这两个函数,而不要在dirwalker函数中嵌入它们.

The reason the workers are probably failing (at least in the example code you provided) is because X and Y are defined as function inside another function. multiprocessing passes functions and objects to worker processes by pickling them in the main process, and unpickling them in the worker processes. Functions defined inside other functions are not picklable, which means multiprocessing won't be able to successfully unpickle them in the worker process. To fix this, define both functions at the top-level of your module, rather than embedded insice the dirwalker function.

您绝对应该继续从XY而不是在results中调用Z.这样,Z可以在所有工作进程中同时运行,而不必在主进程中一次运行一次调用.请记住,您的callback函数应该尽可能快,所以您不会拖延处理结果.在其中执行Z会使速度变慢.

You should definitely continue to call Z from X and Y, not in results. That way, Z can be run concurrently across all your worker processes, rather than having to be run one call at a time in your main process. And remember, your callback function is supposed to be as quick as possible, so you don't hold up processing results. Executing Z in there would slow things down.

以下是一些简单的示例代码,它们与您正在执行的操作类似,希望可以使您对代码的外观有所了解:

Here's some simple example code that's similar to what you're doing, that hopefully gives you an idea of what your code should look like:

import multiprocessing as mp
import os

# X() reads files and grabs lines, calls helper function to calculate
# info, and returns stuff to the callback function
def X(f): 
    fileinfo = Z(f) 
    return fileinfo 

# Y() reads other types of files and does the same thing
def Y(f): 
    fileinfo = Z(f)
    return fileinfo

# helper function
def Z(arr):
    return arr + "zzz"

def dirwalker(directory):
    ahlala = []

    # results() is the callback function
    def results(r):
        ahlala.append(r) # or .append, haven't yet decided

    for _,_,files in os.walk(directory):
        pool = mp.Pool(mp.cpu_count())
        for f in files:
            if len(f) > 5: # Just an arbitrary thing to split up the list with
                pool.apply_async(X, args=(f,), callback=results)  # ,error_callback=handle_error # In Python 3, there's an error_callback you can use to handle errors. It's not available in Python 2.7 though :(
            else:
                pool.apply_async(Y, args=(f,), callback=results)

    pool.close()
    pool.join()
    return ahlala


if __name__ == "__main__":
    print(dirwalker("/usr/bin"))

输出:

['ftpzzz', 'findhyphzzz', 'gcc-nm-4.8zzz', 'google-chromezzz' ... # lots more here ]

您可以使用multiprocessing.Manager类创建在父进程和子进程之间共享的dict对象:

You can create a dict object that's shared between your parent and child processes using the multiprocessing.Manager class:

pool = mp.Pool(mp.cpu_count())
m = multiprocessing.Manager()
helper_dict = m.dict()
for f in files:
    if len(f) > 5:
        pool.apply_async(X, args=(f, helper_dict), callback=results)
    else:
        pool.apply_async(Y, args=(f, helper_dict), callback=results)

然后让XY接受另一个名为helper_dict的参数(或您想要的任何名称),就一切就绪了.

Then make X and Y take a second argument called helper_dict (or whatever name you want), and you're all set.

需要注意的是,这是通过创建一个包含普通字典的服务器进程来完成的,所有其他进程都通过Proxy对象与该字典进行通信.因此,每次读取或写入字典时,您都在执行IPC.这使它比真正的命令慢了很多.

The caveat is that this worked by creating a server process that contains a normal dict, and all your other processes talk to that one dict via a Proxy object. So every time you read or write to the dict, you're doing IPC. This makes it a lot slower than a real dict.

这篇关于multiprocessing.Pool:使用apply_async的回调选项时调用辅助函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆