aioboto3加速不符合预期 [英] aioboto3 speedup not as expected

查看:69
本文介绍了aioboto3加速不符合预期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试aioboto3库,该库对于加快某些任务的执行而言似乎非常有前途.例如,我需要为特定存储桶和前缀中的所有S3对象找到标签.但是不幸的是,速度的提高并不是我所希望的.

I'm trying the aioboto3 lib, which looks extremely promising for speeding certain tasks up. For example, I need to find tags for all S3 objects within a particular bucket and prefix. But unfortunately the speed increase is not what I had hoped for.

有1000个对象,大约是一半的时间.拥有8000个对象,几乎是同一时间!这是在c3.8xlarge EC2实例上运行的.

With 1000 objects, it's about half the time. With 8000 objects, it was about the same time! This was run on a c3.8xlarge EC2 instance.

代码:

import asyncio
import aioboto3
from boto3.dynamodb.conditions import Key
import boto3
import logging

import time

def dbg(*args):
    print(args[0] % args[1:])

def avg(l):
    return sum(l) / len(l)

def get_versions(count):
    s3cli = boto3.client('s3')

    r = s3cli.list_object_versions(Bucket=bucket)
    l = r['Versions']
    while True:
        if not r['IsTruncated'] or len(l) >= count:
            return l[:count]
        r = s3cli.list_object_versions(Bucket=bucket,KeyMarker=r['NextKeyMarker'],VersionIdMarker=r['NextVersionIdMarker'])
        l.extend(r['Versions'])


def try_s3_sync(versions):

    s3cli = boto3.client('s3')

    t = time.time()
    rtags = []
    for ver in versions:
        rtag = s3cli.get_object_tagging(Bucket=bucket,Key=ver['Key'],VersionId=ver['VersionId'])
        rtags.append(rtag)

    elapsed = time.time() - t

    dbg("sync elapsed <%s>",elapsed)
    return elapsed


async def a_try_s3(versions):

    async with aioboto3.client('s3') as s3cli:

        t = time.time()
        futures = [s3cli.get_object_tagging(Bucket=bucket,Key=ver['Key'],VersionId=ver['VersionId']) for ver in versions]
        rtags, other = await asyncio.wait(futures)

        elapsed = time.time() - t

        dbg("async elapsed <%s>",elapsed)
        return elapsed


def try_s3_async(versions):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(a_try_s3(versions))


# -------------------------------------------


if __name__ == '__main__':

    for num in (1000,8000):
        versions = get_versions(num)
        dbg("len(versions) <%s>",len(versions))

        tries = 3
        dbg('avg for sync: %s',avg(list(try_s3_sync(versions) for _ in range(tries))))
        dbg('avg for async: %s',avg(list(try_s3_async(versions) for _ in range(tries))))

输出:

len(versions) <1000>
sync elapsed <19.383010864257812>
sync elapsed <20.18708372116089>
sync elapsed <20.515722513198853>
avg for sync: 20.028605699539185
async elapsed <13.05319333076477>
async elapsed <7.40950345993042>
async elapsed <9.881770372390747>
avg for async: 10.114822387695312
len(versions) <8000>
sync elapsed <168.69372606277466>
sync elapsed <158.15257668495178>
sync elapsed <167.32361602783203>
avg for sync: 164.7233062585195
async elapsed <158.08434414863586>
async elapsed <165.93541312217712>
async elapsed <165.63341856002808>
avg for async: 163.21772527694702

任何建议都值得赞赏.

推荐答案

您不能只是将数千个任务转储到事件循环中,否则会降低整个程序的速度.您需要执行以下操作:

you can't just dump thousands of tasks onto the event loop as you'll slow down your whole program. You need to do something like this:

from asyncpool import AsyncPool
import aiobotocore.session

async def a_try_s3(versions):
    max_parallel_tasks = 100
    session = aiobotocore.session.AioSession()
    config = aiobotocore.config.AioConfig(max_pool_connections=max_parallel_tasks)
    logger = logging.getLogger("ExamplePool")
    rtags = []

    async with session.create_client('s3', config=config) as s3cli, \
            AsyncPool(loop, num_workers=max_parallel_tasks, name="WorkPool",
                             logger=logger,
                             worker_co=s3cli.get_object_tagging,
                             log_every_n=100, raise_on_join=True) as pool:

        async def get_object_tagging(*, Key, VersionId):
            rtag = await s3cli.get_object_tagging(Bucket=bucket, Key=Key, VersionId=VersionId)
            rtags.append(rtag)

        t = time.time()

        for ver in versions:
            await pool.push(Key=ver['Key'], VersionId=ver['VersionId'])

    elapsed = time.time() - t

    dbg("async elapsed <%s>", elapsed)
    return elapsed

这篇关于aioboto3加速不符合预期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆