带有aioboto3的Python 3 asyncio似乎是顺序的 [英] Python 3 asyncio with aioboto3 seems sequential

查看:315
本文介绍了带有aioboto3的Python 3 asyncio似乎是顺序的的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将一个简单的python 3脚本移植到AWS Lambda. 该脚本很简单:它从十二个S3对象中收集信息并返回结果.

I am porting a simple python 3 script to AWS Lambda. The script is simple: it gathers information from a dozen of S3 objects and returns the results.

该脚本使用multiprocessing.Pool并行收集所有文件.尽管multiprocessing不能在AWS Lambda环境中使用,因为缺少/dev/shm. 因此,我认为不要写脏的multiprocessing.Process/multiprocessing.Queue替代品,而是尝试asyncio.

The script used multiprocessing.Pool to gather all the files in parallel. Though multiprocessing cannot be used in an AWS Lambda environment since /dev/shm is missing. So I thought instead of writing a dirty multiprocessing.Process / multiprocessing.Queue replacement, I would try asyncio instead.

我正在Python 3.8上使用最新版本的aioboto3(8.0.5).

I am using the latest version of aioboto3 (8.0.5) on Python 3.8.

我的问题是,在天真的顺序下载文件与多路复用下载的异步事件循环之间,我似乎无法获得任何改善.

My problem is that I cannot seem to gain any improvement between a naive sequential download of the files, and an asyncio event loop multiplexing the downloads.

这是我的代码的两个版本.

Here are the two versions of my code.

import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import boto3
import aioboto3

BUCKET = 'some-bucket'
KEYS = [
    'some/key/1',
    [...]
    'some/key/10',
]

async def download_aio():
    """Concurrent download of all objects from S3"""
    async with aioboto3.client('s3') as s3:
        objects = [s3.get_object(Bucket=BUCKET, Key=k) for k in KEYS]
        objects = await asyncio.gather(*objects)
        buffers = await asyncio.gather(*[o['Body'].read() for o in objects])

def download():
    """Sequentially download all objects from S3"""
    s3 = boto3.client('s3')
    for key in KEYS:
        object = s3.get_object(Bucket=BUCKET, Key=key)
        object['Body'].read()

def run_sequential():
    download()

def run_concurrent():
    loop = asyncio.get_event_loop()
    #loop.set_default_executor(ProcessPoolExecutor(10))
    #loop.set_default_executor(ThreadPoolExecutor(10))
    loop.run_until_complete(download_aio())

run_sequential()run_concurrent()的时序非常相似(十几个10MB文件约为3秒). 我确信并发版本不是,原因多种多样:

The timing for both run_sequential() and run_concurrent() are quite similar (~3 seconds for a dozen of 10MB files). I am convinced the concurrent version is not, for multiple reasons:

  • 我尝试切换到Process/ThreadPoolExecutor,并且在函数运行期间生成了进程/线程,尽管它们没有执行任何操作
  • 顺序和并发之间的时间几乎相同,尽管我的网络接口绝对没有饱和,并且CPU也未绑定
  • 并发版本花费的时间随着文件数量的增加而线性增加.
  • I tried switching to Process/ThreadPoolExecutor, and I the processes/threads spawned for the duration of the function, though they are doing nothing
  • The timing between sequential and concurrent is very close to the same, though my network interface is definitely not saturated, and the CPU is not bound either
  • The time taken by the concurrent version increases linearly with the number of files.

我确定有些东西丢失了,但是我无法将自己的头包裹在周围.

I am sure something is missing, but I just can't wrap my head around what.

有什么想法吗?

推荐答案

在花了几个小时试图理解如何正确使用aioboto3之后,我决定只切换到我的备份解决方案. 我最终发布了自己的multiprocessing.Pool天真版本,可在AWS Lambda环境中使用.

After loosing some hours trying to understand how to use aioboto3 correctly, I decided to just switch to my backup solution. I ended up rolling my own naive version of multiprocessing.Pool for use within an AWS lambda environment.

如果将来有人偶然发现此线程,那就是.它远非完美,但对于我的简单案例而言,可以很容易地按原样替换multiprocessing.Pool.

If someone stumble across this thread in the future, here it is. It is far from perfect, but easy enough to replace multiprocessing.Pool as-is for my simple cases.

from multiprocessing import Process, Pipe
from multiprocessing.connection import wait


class Pool:
    """Naive implementation of a process pool with mp.Pool API.

    This is useful since multiprocessing.Pool uses a Queue in /dev/shm, which
    is not mounted in an AWS Lambda environment.
    """

    def __init__(self, process_count=1):
        assert process_count >= 1
        self.process_count = process_count

    @staticmethod
    def wrap_pipe(pipe, index, func):
        def wrapper(args):
            try:
                result = func(args)
            except Exception as exc:  # pylint: disable=broad-except
                result = exc
            pipe.send((index, result))
        return wrapper

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        pass

    def map(self, function, arguments):
        pending = list(enumerate(arguments))
        running = []
        finished = [None] * len(pending)
        while pending or running:
            # Fill the running queue with new jobs
            while len(running) < self.process_count:
                if not pending:
                    break
                index, args = pending.pop(0)
                pipe_parent, pipe_child = Pipe(False)
                process = Process(
                    target=Pool.wrap_pipe(pipe_child, index, function),
                    args=(args, ))
                process.start()
                running.append((index, process, pipe_parent))
            # Wait for jobs to finish
            for pipe in wait(list(map(lambda t: t[2], running))):
                index, result = pipe.recv()
                # Remove the finished job from the running list
                running = list(filter(lambda x: x[0] != index, running))
                # Add the result to the finished list
                finished[index] = result

        return finished

这篇关于带有aioboto3的Python 3 asyncio似乎是顺序的的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆