异步多个并发服务器 [英] asyncio multiple concurrent servers

查看:98
本文介绍了异步多个并发服务器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Python的asyncio一起运行多个服务器,并在它们之间传递数据。对于我的特定情况,我需要一个带有websocket的Web服务器,与外部设备的UDP连接以及数据库和其他交互方式。我可以单独找到几乎所有这些示例,但是我正在努力找出正确的方法来使它们在数据之间同时运行并同时运行。

I'm trying to use Python's asyncio to run multiple servers together, passing data between them. For my specific case I need a web server with websockets, a UDP connection to an external device, as well as database and other interactions. I can find examples of pretty much any of these individually but I'm struggling to work out the correct way to have them run concurrently with data being pushed between them.

我在这里找到的最接近的是:在asyncio协议/服务器之间进行通信(尽管我一直无法使其运行在Python 3.6)

The closest I have found here is here: Communicate between asyncio protocol/servers (although I've been unable to make it run on Python 3.6)

一个更具体的示例:我如何从> https://github.com/aio-libs/aiohttp

For a more concrete example: How would I take the following aiohttp example code from https://github.com/aio-libs/aiohttp:

from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == web.MsgType.text:
            await ws.send_str("Hello, {}".format(msg.data))
        elif msg.type == web.MsgType.binary:
            await ws.send_bytes(msg.data)
        elif msg.type == web.MsgType.close:
            break

    return ws


app = web.Application()
app.router.add_get('/echo', wshandler)
app.router.add_get('/', handle)
app.router.add_get('/{name}', handle)

web.run_app(app)

和以下TCP回显服务器示例( http:// asyncio .readthedocs.io / en / latest / tcp_echo.html ):

and the following TCP echo server sample (http://asyncio.readthedocs.io/en/latest/tcp_echo.html):

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print("Received %r from %r" % (message, addr))

    print("Send: %r" % message)
    writer.write(data)
    await writer.drain()

    print("Close the client socket")
    writer.close()

loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

将它们组合成一个脚本,其中通过websockets或TCP echo服务器接收到的任何消息都发送到其中一个的所有客户端?

and combine them into a single script where any messages received via either websockets or the TCP echo server were sent out to all clients of either?

我该如何添加一个(例如)每秒向所有客户端发送一条消息的代码(为了争辩当前时间戳)?

And how would I add a piece of code that (say) every second sent a message to all clients (for the sake of argument the current timestamp)?

解决方案

首先,您需要将所有协程放入单个事件循环中。您可以先避免使用方便的API(例如 run_app )为您启动事件循环。代替 web.run_app(app),写一些类似的东西:

First you need to get all of your coroutines into a single event loop. You can start by avoiding convenience APIs that start the event loop for you such as run_app. Instead of web.run_app(app), write something like:

runner = aiohttp.web.AppRunner(app)
loop.run_until_complete(runner.setup())
# here you can specify the listen address and port
site = aiohttp.web.TCPSite(runner)    
loop.run_until_complete(site.start())

然后运行回显服务器设置,并且都准备共享asyncio事件循环。在脚本末尾,使用 loop.run_forever()(或在您的应用程序中有意义的任何其他方式)启动事件循环。

Then run the echo server setup, and both are ready to share the asyncio event loop. At the end of the script, start the event loop using loop.run_forever() (or in any other way that makes sense in your application).

要向客户端广播信息,请创建广播协程并将其添加到事件循环:

To broadcast information to clients, create a broadcast coroutine and add it to the event loop:

# Broadcast data is transmitted through a global Future. It can be awaited
# by multiple clients, all of which will receive the broadcast. At each new
# iteration, a new future is created, to be picked up by new awaiters.
broadcast_data = loop.create_future()

async def broadcast():
    global broadcast_data
    while True:
        broadcast_data.set_result(datetime.datetime.now())
        broadcast_data = loop.create_future()
        await asyncio.sleep(1)

loop.create_task(broadcast())

最后,等待为客户创建的每个协程中的广播,例如 handle_echo

Finally, await the broadcast in each coroutine created for a client, such as handle_echo:

def handle_echo(r, w):
    while True:
        data = await broadcast_data
        # data contains the broadcast datetime - send it to the client
        w.write(str(data))

修改websockets处理程序协程以等待并以相同方式中继广播数据应该很简单。

It should be straightforward to modify the websockets handler coroutine to await and relay the broadcast data in the same manner.

这篇关于异步多个并发服务器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆