如何在AWS Lambda中模拟multiprocessing.Pool.map()? [英] How to emulate multiprocessing.Pool.map() in AWS Lambda?

查看:115
本文介绍了如何在AWS Lambda中模拟multiprocessing.Pool.map()?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

AWS Lambda上的Python不支持 multiprocessing.Pool.map(),如

Python on AWS Lambda does not support multiprocessing.Pool.map(), as documented in this other question. Please note that the other question was asking why it doesn't work. This question is different, I'm asking how to emulate the functionality given the lack of underlying support.

另一个问题的答案之一就是以下代码:

One of the answers to that other question gave us this code:

# Python 3.6
from multiprocessing import Pipe, Process

def myWorkFunc(data, connection):
    result = None

    # Do some work and store it in result

    if result:
        connection.send([result])
    else:
        connection.send([None])


def myPipedMultiProcessFunc():

    # Get number of available logical cores
    plimit = multiprocessing.cpu_count()

    # Setup management variables
    results = []
    parent_conns = []
    processes = []
    pcount = 0
    pactive = []
    i = 0

    for data in iterable:
        # Create the pipe for parent-child process communication
        parent_conn, child_conn = Pipe()
        # create the process, pass data to be operated on and connection
        process = Process(target=myWorkFunc, args=(data, child_conn,))
        parent_conns.append(parent_conn)
        process.start()
        pcount += 1

        if pcount == plimit: # There is not currently room for another process
            # Wait until there are results in the Pipes
            finishedConns = multiprocessing.connection.wait(parent_conns)
            # Collect the results and remove the connection as processing
            # the connection again will lead to errors
            for conn in finishedConns:
                results.append(conn.recv()[0])
                parent_conns.remove(conn)
                # Decrement pcount so we can add a new process
                pcount -= 1

    # Ensure all remaining active processes have their results collected
    for conn in parent_conns:
        results.append(conn.recv()[0])
        conn.close()

    # Process results as needed

可以修改此示例代码以支持 multiprocessing.Pool.map()吗?

Can this sample code be modified to support multiprocessing.Pool.map()?

到目前为止我尝试过什么

我分析了上面的代码,但看不到要执行的函数或数据的参数,因此我推断出它不执行与 multiprocessing.Pool.map()<相同的功能/code>.除了演示可以组装到解决方案中的构建块外,尚不清楚代码的作用.

I analysed the above code and I do not see a parameter for the function to be executed or the data, so I'm inferring that it does not perform the same function as multiprocessing.Pool.map(). It is not clear what the code does, other than demonstrating the building blocks that could be assembled into a solution.

这是为我编写代码"问题吗?

是的,是的.这个问题影响了成千上万的Python开发人员,如果我们所有人共享相同的代码,它将对世界经济产生更大的影响,减少温室气体排放等,而不是强迫每个遇到此问题的SO用户进行开发他们自己的解决方法.我希望我已经尽了自己的职责,将其提炼成一个明确的问题,并准备好了假定的构建基块.

Yes to some extent, it is. This issue impacts thousands of Python developers, and it would be far more efficient for the world economy, less green-house gas emissions, etc if all of us share the same code, instead of forcing every SO user who encounters this to go and develop their own workaround. I hope I've done my part by distilling this into a clear question with the presumed building blocks ready to go.

推荐答案

我能够将其用于我自己的测试.我的代码基于此链接: https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/

I was able to get this working for my own tests. I've based my code on this link : https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/

NB1:您必须增加对lambda函数的内存分配.如果使用默认的最小数量,则多处理不会提高性能.使用我的帐户可以分配的最大数量(3008MB),达到了以下数字.

NB1: you MUST increase memory allocation to the lambda function. with the default minimal amount, there's no increase in performance with multiprocessing. With the maximum my account can allocate (3008MB) the figures below were attained.

NB2:我在这里完全忽略了最大并行进程.我的用法没有很多要处理的元素.

NB2: I'm completely ignoring max processes in parallel here. My usage doesn't have a whole lot of elements to work on.

使用下面的代码,用法是:

with the code below, usage is:

work = funcmap(yourfunction,listofstufftoworkon)
yourresults = work.run()

从我的笔记本电脑上运行:

running from my laptop:

jumper@jumperdebian[3333] ~/scripts/tmp  2019-09-04 11:52:30
└─ $ ∙ python3 -c "import tst; tst.lambda_handler(None,None)"
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
SP runtime : 9.574460506439209
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
MP runtime : 6.422513484954834

从aws运行:

Function Logs:
START RequestId: 075a92c0-7c4f-4f48-9820-f394ee899a97 Version: $LATEST
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
SP runtime : 12.135798215866089
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
MP runtime : 7.293526887893677
END RequestId: 075a92c0-7c4f-4f48-9820-f394ee899a97

这是测试代码:

import time
from multiprocessing import Process, Pipe
import boto3

class funcmap(object):

    fmfunction=None
    fmlist=None

    def __init__(self,pfunction,plist):
        self.fmfunction=pfunction
        self.fmlist=plist

    def calculation(self, pfunction, pload, conn):
        panswer=pfunction(pload)
        conn.send([pload,panswer])
        conn.close()

    def run(self):
        datalist = self.fmlist
        processes = []
        parent_connections = []
        for datum in datalist:
            parent_conn, child_conn = Pipe()
            parent_connections.append(parent_conn)
            process = Process(target=self.calculation, args=(self.fmfunction, datum, child_conn,))
            processes.append(process)

        pstart=time.time()
        for process in processes:
            process.start()
            #print("starting at t+ {} s".format(time.time()-pstart))
        for process in processes:
            process.join()
            #print("joining at t+ {} s".format(time.time()-pstart))

        results = []
        for parent_connection in parent_connections:
            resp=parent_connection.recv()
            results.append((resp[0],resp[1]))
        return results


def fibo(n):
    if n <= 2 : return 1
    return fibo(n-1)+fibo(n-2)

def lambda_handler(event, context):
    #worklist=[22,23,24,25,26,27,28,29,30,31,32,31,30,29,28,27,26,27,28,29]
    #worklist=[22,23,24,25,26,27,28,29,30]
    worklist=[30,30,30,30]
    #worklist=[30]
    _start = time.time()
    results=[]
    for a in worklist:
        results.append((a,fibo(a)))
    print("results : {}".format(results))
    _end = time.time()
    print("SP runtime : {}".format(_end-_start))

    _mstart = time.time()
    work = funcmap(fibo,worklist)
    results = work.run()
    print("results : {}".format(results))
    _mend = time.time()
    print("MP runtime : {}".format(_mend-_mstart))

希望有帮助.

这篇关于如何在AWS Lambda中模拟multiprocessing.Pool.map()?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆