带有aioboto3的Python 3 asyncio似乎是顺序的 [英] Python 3 asyncio with aioboto3 seems sequential
问题描述
我正在将一个简单的python 3脚本移植到AWS Lambda. 该脚本很简单:它从十二个S3对象中收集信息并返回结果.
I am porting a simple python 3 script to AWS Lambda. The script is simple: it gathers information from a dozen of S3 objects and returns the results.
该脚本使用multiprocessing.Pool
并行收集所有文件.尽管multiprocessing
不能在AWS Lambda环境中使用,因为缺少/dev/shm
.
因此,我认为不要写脏的multiprocessing.Process
/multiprocessing.Queue
替代品,而是尝试asyncio
.
The script used multiprocessing.Pool
to gather all the files in parallel. Though multiprocessing
cannot be used in an AWS Lambda environment since /dev/shm
is missing.
So I thought instead of writing a dirty multiprocessing.Process
/ multiprocessing.Queue
replacement, I would try asyncio
instead.
我正在Python 3.8上使用最新版本的aioboto3
(8.0.5).
I am using the latest version of aioboto3
(8.0.5) on Python 3.8.
我的问题是,在天真的顺序下载文件与多路复用下载的异步事件循环之间,我似乎无法获得任何改善.
My problem is that I cannot seem to gain any improvement between a naive sequential download of the files, and an asyncio event loop multiplexing the downloads.
这是我的代码的两个版本.
Here are the two versions of my code.
import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import boto3
import aioboto3
BUCKET = 'some-bucket'
KEYS = [
'some/key/1',
[...]
'some/key/10',
]
async def download_aio():
"""Concurrent download of all objects from S3"""
async with aioboto3.client('s3') as s3:
objects = [s3.get_object(Bucket=BUCKET, Key=k) for k in KEYS]
objects = await asyncio.gather(*objects)
buffers = await asyncio.gather(*[o['Body'].read() for o in objects])
def download():
"""Sequentially download all objects from S3"""
s3 = boto3.client('s3')
for key in KEYS:
object = s3.get_object(Bucket=BUCKET, Key=key)
object['Body'].read()
def run_sequential():
download()
def run_concurrent():
loop = asyncio.get_event_loop()
#loop.set_default_executor(ProcessPoolExecutor(10))
#loop.set_default_executor(ThreadPoolExecutor(10))
loop.run_until_complete(download_aio())
run_sequential()
和run_concurrent()
的时序非常相似(十几个10MB文件约为3秒).
我确信并发版本不是,原因多种多样:
The timing for both run_sequential()
and run_concurrent()
are quite similar (~3 seconds for a dozen of 10MB files).
I am convinced the concurrent version is not, for multiple reasons:
- 我尝试切换到
Process/ThreadPoolExecutor
,并且在函数运行期间生成了进程/线程,尽管它们没有执行任何操作 - 顺序和并发之间的时间几乎相同,尽管我的网络接口绝对没有饱和,并且CPU也未绑定
- 并发版本花费的时间随着文件数量的增加而线性增加.
- I tried switching to
Process/ThreadPoolExecutor
, and I the processes/threads spawned for the duration of the function, though they are doing nothing - The timing between sequential and concurrent is very close to the same, though my network interface is definitely not saturated, and the CPU is not bound either
- The time taken by the concurrent version increases linearly with the number of files.
我确定有些东西丢失了,但是我无法将自己的头包裹在周围.
I am sure something is missing, but I just can't wrap my head around what.
有什么想法吗?
推荐答案
在花了几个小时试图理解如何正确使用aioboto3
之后,我决定只切换到我的备份解决方案.
我最终发布了自己的multiprocessing.Pool
天真版本,可在AWS Lambda环境中使用.
After loosing some hours trying to understand how to use aioboto3
correctly, I decided to just switch to my backup solution.
I ended up rolling my own naive version of multiprocessing.Pool
for use within an AWS lambda environment.
如果将来有人偶然发现此线程,那就是.它远非完美,但对于我的简单案例而言,可以很容易地按原样替换multiprocessing.Pool
.
If someone stumble across this thread in the future, here it is. It is far from perfect, but easy enough to replace multiprocessing.Pool
as-is for my simple cases.
from multiprocessing import Process, Pipe
from multiprocessing.connection import wait
class Pool:
"""Naive implementation of a process pool with mp.Pool API.
This is useful since multiprocessing.Pool uses a Queue in /dev/shm, which
is not mounted in an AWS Lambda environment.
"""
def __init__(self, process_count=1):
assert process_count >= 1
self.process_count = process_count
@staticmethod
def wrap_pipe(pipe, index, func):
def wrapper(args):
try:
result = func(args)
except Exception as exc: # pylint: disable=broad-except
result = exc
pipe.send((index, result))
return wrapper
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
pass
def map(self, function, arguments):
pending = list(enumerate(arguments))
running = []
finished = [None] * len(pending)
while pending or running:
# Fill the running queue with new jobs
while len(running) < self.process_count:
if not pending:
break
index, args = pending.pop(0)
pipe_parent, pipe_child = Pipe(False)
process = Process(
target=Pool.wrap_pipe(pipe_child, index, function),
args=(args, ))
process.start()
running.append((index, process, pipe_parent))
# Wait for jobs to finish
for pipe in wait(list(map(lambda t: t[2], running))):
index, result = pipe.recv()
# Remove the finished job from the running list
running = list(filter(lambda x: x[0] != index, running))
# Add the result to the finished list
finished[index] = result
return finished
这篇关于带有aioboto3的Python 3 asyncio似乎是顺序的的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!