AIOHTTP-不推荐使用Application.make_handler(...)-添加多处理 [英] AIOHTTP - Application.make_handler(...) is deprecated - Adding Multiprocessing

查看:108
本文介绍了AIOHTTP-不推荐使用Application.make_handler(...)-添加多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我走了一段旅程:我可以从Python网络服务器中挤出多少性能?"这导致我进入AIOHTTP和uvloop.不过,我仍然可以看到AIOHTTP并未充分利用我的CPU.我开始将多重处理与AIOHTTP一起使用.我了解到,有一个Linux内核功能,该功能允许多个进程共享同一TCP端口.这使我开发了以下代码(效果很好):

I went down a journey of "How much performance can I squeeze out of a Python web-server?" This lead me to AIOHTTP and uvloop. Still, I could see that AIOHTTP wasn't using my CPU to its full potential. I set out to use multiprocessing with AIOHTTP. I learned that there's a Linux kernel feature that allows multiple processes to share the same TCP port. This lead me to develop the following code (Which works wonderfully):

import asyncio
import os
import socket
import time
from aiohttp import web
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count

CPU_COUNT = cpu_count()
print("CPU Count:", CPU_COUNT)


def mk_socket(host="127.0.0.1", port=8000, reuseport=False):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    if reuseport:
        SO_REUSEPORT = 15
        sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1)
    sock.bind((host, port))
    return sock


async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    pid = os.getpid()
    text = "{:.2f}: Hello {}! Process {} is treating you\n".format(
        time.time(), name, pid)
    #time.sleep(5)  # intentionally blocking sleep to simulate CPU load
    return web.Response(text=text)


def start_server():
    host = "127.0.0.1"
    port=8000
    reuseport = True
    app = web.Application()
    sock = mk_socket(host, port, reuseport=reuseport)
    app.add_routes([web.get('/', handle),
                    web.get('/{name}', handle)])
    loop = asyncio.get_event_loop()
    coro = loop.create_server(
            protocol_factory=app.make_handler(),
            sock=sock,
        )
    srv = loop.run_until_complete(coro)
    loop.run_forever()


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        for i in range(0, CPU_COUNT):
            executor.submit(start_server)

在应用此代码之前,

我的网站的wrk基准测试:

wrk benchmark of my site before applying this code:

Running 30s test @ http://127.0.0.1:8000/
  12 threads and 400 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    54.33ms    6.54ms 273.24ms   89.95%
    Req/Sec   608.68    115.97     2.27k    83.63%
  218325 requests in 30.10s, 41.23MB read
  Non-2xx or 3xx responses: 218325
Requests/sec:   7254.17
Transfer/sec:      1.37MB

之后的wrk基准测试

Running 30s test @ http://127.0.0.1:8000/
  12 threads and 400 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    15.96ms    7.27ms  97.29ms   84.78%
    Req/Sec     2.11k   208.30     4.45k    75.50%
  759290 requests in 30.08s, 153.51MB read
Requests/sec:  25242.39
Transfer/sec:      5.10MB

哇!但是有一个问题:

DeprecationWarning: Application.make_handler(...) is deprecated, use AppRunner API instead
  protocol_factory=app.make_handler()

所以我尝试了这个:

import asyncio
import os
import socket
import time
from aiohttp import web
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count

CPU_COUNT = cpu_count()
print("CPU Count:", CPU_COUNT)

def mk_socket(host="127.0.0.1", port=8000, reuseport=False):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    if reuseport:
        SO_REUSEPORT = 15
        sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1)
    sock.bind((host, port))
    return sock

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    pid = os.getpid()
    text = "{:.2f}: Hello {}! Process {} is treating you\n".format(
        time.time(), name, pid)
    #time.sleep(5)  # intentionally blocking sleep to simulate CPU load
    return web.Response(text=text)

async def start_server():
    host = "127.0.0.1"
    port=8000
    reuseport = True
    app = web.Application()
    sock = mk_socket(host, port, reuseport=reuseport)
    app.add_routes([web.get('/', handle),
                    web.get('/{name}', handle)])

    coro = loop.create_server(
            protocol_factory=app.make_handler(),
            sock=sock,
        )
    runner = web.AppRunner(app)
    await runner.setup()
    srv = web.TCPSite(runner, 'localhost', 8000)
    await srv.start()
    print('Server started at http://127.0.0.1:8000')
    return coro, app, runner

async def finalize(srv, app, runner):
    sock = srv.sockets[0]
    app.loop.remove_reader(sock.fileno())
    sock.close()

    #await handler.finish_connections(1.0)
    await runner.cleanup()
    srv.close()
    await srv.wait_closed()
    await app.finish()

def init():
    loop = asyncio.get_event_loop()
    srv, app, runner = loop.run_until_complete(init)
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        loop.run_until_complete((finalize(srv, app, runner)))


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        for i in range(0, CPU_COUNT):
            executor.submit(init)

这显然是不完整的,因为未使用coro.我不确定在哪里将套接字与AppRunner集成在一起.答案应显示修改为使用App Runner的原始示例.

which is obviously incomplete becuase coro isn't being used. I'm not sure where to integrate the socket with AppRunner. Answer should show original example modified to use App Runner.

推荐答案

由于这是我第一次使用协程和aiohttp,所以我可能是错的,但它似乎可以与SockSite一起使用:

As it's my first time using coroutines and aiohttp, I may be wrong, but it seems to work with a SockSite:

#!/usr/bin/env python

import asyncio
import os
import socket
import time
import traceback
from aiohttp import web
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count

CPU_COUNT = cpu_count()
print("CPU Count:", CPU_COUNT)

def mk_socket(host="127.0.0.1", port=9090, reuseport=False):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    if reuseport:
        SO_REUSEPORT = 15
        sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1)
    sock.bind((host, port))
    return sock

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    pid = os.getpid()
    text = "{:.2f}: Hello {}! Process {} is treating you\n".format(
        time.time(), name, pid)
    #time.sleep(5)  # intentionally blocking sleep to simulate CPU load
    return web.Response(text=text)

async def start_server():
    try:
        host = "127.0.0.1"
        port=9090
        reuseport = True
        app = web.Application()
        app.add_routes([web.get('/', handle),
                        web.get('/{name}', handle)])
        runner = web.AppRunner(app)
        await runner.setup()
        sock = mk_socket(host, port, reuseport=reuseport)
        srv = web.SockSite(runner, sock)
        await srv.start()
        print('Server started at http://127.0.0.1:9090')
        return srv, app, runner
    except Exception:
        traceback.print_exc()
        raise

async def finalize(srv, app, runner):
    sock = srv.sockets[0]
    app.loop.remove_reader(sock.fileno())
    sock.close()

    #await handler.finish_connections(1.0)
    await runner.cleanup()
    srv.close()
    await srv.wait_closed()
    await app.finish()

def init():
    loop = asyncio.get_event_loop()
    srv, app, runner = loop.run_until_complete(start_server())
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        loop.run_until_complete((finalize(srv, app, runner)))


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        for i in range(0, CPU_COUNT):
            executor.submit(init)

最终:

>curl http://127.0.0.1:9090
1580741746.47: Hello Anonymous! Process 54623 is treating you
>curl http://127.0.0.1:9090
1580741747.05: Hello Anonymous! Process 54620 is treating you
>curl http://127.0.0.1:9090
1580741747.77: Hello Anonymous! Process 54619 is treating you
>curl http://127.0.0.1:9090
1580741748.36: Hello Anonymous! Process 54621 is treating you

我还在finalize例程中添加了一个日志,它似乎已正确触发.

I also added a log in the finalize routine and it seems correctly triggered.

出于好奇,我尝试在较旧的内核上进行了尝试,并且确认启用该选项后,它不起作用(与False配合使用).

And, out of curiosity, I gave it a try on an older kernel and I confirm it doesn't work when the option is enabled (it works with False).

这篇关于AIOHTTP-不推荐使用Application.make_handler(...)-添加多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆