asyncio:运行一个函数,该函数线程与来自websocket客户端的多个请求 [英] asyncio: run one function threaded with multiple requests from websocket clients

查看:104
本文介绍了asyncio:运行一个函数,该函数线程与来自websocket客户端的多个请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个websocket服务器(python 3.x)接受请求,每个请求都是一个url变量.它运行得很好,只是它只依次串行地执行每个请求. 该函数正在运行时,它还会阻止尝试连接的客户端.我想要的是非阻止!

I have a websocket server (python 3.x) taking requests where each is a url variable. It runs just fine except it only executes each request in serial, after one another. While the function is running it also blocks the client(s) trying to connect. Non-blocking is what i want!

    websocket和子流程功能的
  • 异步多处理线程.
  • 设置要使用的内核数的能力.但这不是必须的.
  • Asyncronous multiprocessed threading of both websocket and subprocess function.
  • The ability to set the number of cores to use. This is not obligatory though.

这就是我所拥有的:

所以,我对这种挫败感并没有走多远.我回到了原始代码,事实证明,您需要使用await asyncio.sleep(.001)休眠该函数.现在它运行得非常好,我同时与多个客户端进行了测试,并且可以异步处理它.

So, I didn't get very far with this frustration. I reverted back to my original code and as it turns out, you need to sleep the function with await asyncio.sleep(.001). Now it runs perfectly fine, I tested with multiple clients at the same time and it handles it asynchronously.

import asyncio, websockets, json
async def handler(websocket, path):
    print("New client connected.")
    await websocket.send('CONNECTED')
    try:
        while True:
            inbound = await websocket.recv()
            if inbound is None:
                break
            while inbound != None:
                import time
                for line in range(10):
                    time.sleep(1)
                    data = {}
                    data['blah'] = line
                    await asyncio.sleep(.000001) # THIS
                    print(data)
                    await websocket.send(json.dumps(data))
                await websocket.send(json.dumps({'progress': 'DONE'}))
                break
    except websockets.exceptions.ConnectionClosed:
        print("Client disconnected.")
if __name__ == "__main__":
    server = websockets.serve(handler, '0.0.0.0', 8080)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(server)
    loop.run_forever()

更新:,如 @udi 所建议,如果您想要一个缓慢的外部进程,则方法是asyncio.subprocess而不是subprocess.通过阻塞调用从管道读取将使其他线程停滞,这是asyncio.subprocess负责的工作.

Update: as suggested by @udi, if you want a slow external process, the way to go is asyncio.subprocess and not subprocess. Reading from pipe with a blocking call stalls the other threads, which is what asyncio.subprocess takes care of.

推荐答案

time.sleep()正在阻止.

尝试:

# blocking_server.py
import asyncio
import time

import websockets

x = 0


async def handler(websocket, path):
    global x
    x += 1
    client_id = x

    try:
        print("[#{}] Connected.".format(client_id))

        n = int(await websocket.recv())
        print("[#{}] Got: {}".format(client_id, n))
        for i in range(1, n + 1):
            print("[#{}] zzz...".format(client_id))
            time.sleep(1)
            print("[#{}] woke up!".format(client_id))
            await asyncio.sleep(.001)
            msg = "*" * i
            print("[#{}] sending: {}".format(client_id, msg))
            await websocket.send(msg)

        msg = "bye!"
        print("[#{}] sending: {}".format(client_id, msg))
        await websocket.send(msg)

        print("[#{}] Done.".format(client_id, msg))

    except websockets.exceptions.ConnectionClosed:
        print("[#{}] Disconnected!.".format(client_id))


if __name__ == "__main__":
    port = 8080
    server = websockets.serve(handler, '0.0.0.0', port)
    print("Started server on port {}".format(port))
    loop = asyncio.get_event_loop()
    loop.run_until_complete(server)
    loop.run_forever()

使用此测试客户端:

# test_client.py
import asyncio
import time

import websockets


async def client(client_id, n):
    t0 = time.time()
    async with websockets.connect('ws://localhost:8080') as websocket:
        print("[#{}] > {}".format(client_id, n))
        await websocket.send(str(n))
        while True:
            resp = await websocket.recv()
            print("[#{}] < {}".format(client_id, resp))
            if resp == "bye!":
                break

    print("[#{}] Done in {:.2f} seconds".format(client_id, time.time() - t0))


tasks = [client(i + 1, 3) for i in range(4)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

现在比较将time.sleep(x)替换为await asyncio.sleep(x)时的结果!

Now compare the result when time.sleep(x) is replaced with await asyncio.sleep(x)!

如果您需要通过asyncio运行缓慢的外部进程,请尝试 :

If you need to run a slow external process via asyncio, try asynico.subprocess:

外部程序示例:

# I am `slow_writer.py`
import sys
import time

n = int(sys.argv[1])

for i in range(1, n + 1):
    time.sleep(1)
    print("*" * i)

使用此服务器:

# nonblocking_server.py

import asyncio
import sys

import websockets

x = 0


async def handler(websocket, path):
    global x
    x += 1
    client_id = x

    try:
        print("[#{}] Connected.".format(client_id))

        n = int(await websocket.recv())

        print("[#{}] Got: {}. Running subprocess..".format(client_id, n))

        cmd = (sys.executable, 'slow_writer.py', str(n))
        proc = await asyncio.create_subprocess_exec(
            *cmd, stdout=asyncio.subprocess.PIPE)

        async for data in proc.stdout:
            print("[#{}] got from subprocess, sending: {}".format(
                client_id, data))
            await websocket.send(data.decode().strip())

        return_value = await proc.wait()
        print("[#{}] Subprocess done.".format(client_id))

        msg = "bye!"
        print("[#{}] sending: {}".format(client_id, msg))
        await websocket.send(msg)

        print("[#{}] Done.".format(client_id, msg))

    except websockets.exceptions.ConnectionClosed:
        print("[#{}] Disconnected!.".format(client_id))


if __name__ == "__main__":

    if sys.platform == 'win32':
        loop = asyncio.ProactorEventLoop()
        asyncio.set_event_loop(loop)

    port = 8080
    server = websockets.serve(handler, '0.0.0.0', port)
    print("Started server on port {}".format(port))
    loop = asyncio.get_event_loop()
    loop.run_until_complete(server)
    loop.run_forever()

这篇关于asyncio:运行一个函数,该函数线程与来自websocket客户端的多个请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆