是否可以在python中同时运行多个asyncio? [英] Is it possible to run multiple asyncio in the same time in python?

查看:175
本文介绍了是否可以在python中同时运行多个asyncio?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

基于我得到的解决方案:在 python 中使用 asyncio 运行多个套接字

Based on the solution that i got: Running multiple sockets using asyncio in python

我也尝试使用 asyncio 添加计算部分

i tried to add also the computation part using asyncio

设置:Python 3.7.4

Setup: Python 3.7.4

import msgpack
import threading
import os
import asyncio
import concurrent.futures
import functools
import nest_asyncio
nest_asyncio.apply()

class ThreadSafeElem(bytes):
  def __init__(self, * p_arg, ** n_arg):
     self._lock = threading.Lock()
  def __enter__(self):
     self._lock.acquire()
     return self
  def __exit__(self, type, value, traceback):
     self._lock.release()

elem = ThreadSafeElem()

async def serialize(data):
   return msgpack.packb(data, use_bin_type=True)
async def serialize1(data1):
   return msgpack.packb(data1, use_bin_type=True)

async def process_data(data,data1):
   loop = asyncio.get_event_loop()
   future = await loop.run_in_executor(None, functools.partial(serialize, data))
   future1 = await loop.run_in_executor(None, functools.partial(serialize1, data1))
   return   await asyncio.gather(future,future1)

 ################ Calculation#############################
def calculate_data():
  global elem
  while True:
      try:
          ... data is calculated (some dictionary))...
          elem, elem1= asyncio.run(process_data(data, data1))
      except:
          pass
#####################################################################
def get_data():
  return elem
def get_data1():
  return elem1
########### START SERVER AND get data contionusly ################
async def client_thread(reader, writer):
  while True:
    try:
        bytes_received = await reader.read(100) 
        package_type = np.frombuffer(bytes_received, dtype=np.int8)
        if package_type ==1 :
           nn_output = get_data1()
        if package_type ==2 :
           nn_output = get_data()               
        writer.write(nn_output)
        await writer.drain()
    except:
        pass

async def start_servers(host, port):
  server = await asyncio.start_server(client_thread, host, port)
  await server.serve_forever()

async def start_calculate():
  await asyncio.run(calculate_data())

def enable_sockets():
 try:
    host = '127.0.0.1'
    port = 60000
    sockets_number = 6
    loop = asyncio.get_event_loop()
    for i in range(sockets_number):
        loop.create_task(start_servers(host,port+i))
    loop.create_task(start_calculate())
    loop.run_forever()
except:
    print("weird exceptions")
##############################################################################

enable_sockets()   

问题是当我从客户端打电话时,服务器没有给我任何东西.

The issue is that when i make a call from client, the server does not give me anything.

我用虚拟数据测试了程序,计算部分没有 asyncio,所以没有这个 loop.create_task(start_calculate()) 并且服务器正确响应.

I tested the program with dummy data and no asyncio on calculation part so without this loop.create_task(start_calculate()) and the server responded correctly.

我还运行了计算数据,而没有将其添加到启用套接字中,并且运行正常.它也适用于这个实现,但问题是服务器没有返回任何东西.

I also run the calculate data without adding it in the enable sockets and it worked. It also working with this implementation, but the problem is the server is not returning anything.

我是这样做的,因为我需要计算部分连续运行,并且当其中一个客户端调用时返回数据.

I did it like this cos i need the calculate part to run continuously and when one of the clients is calling to return the data at that point.

推荐答案

asyncio 事件循环不能嵌套在另一个事件循环中,这样做没有意义:asyncio.run(和类似的)阻塞当前线程直到完成.这不会增加并行度,只会禁用任何外部事件循环.

An asyncio event loop cannot be nested inside another, and there is no point in doing so: asyncio.run (and similar) blocks the current thread until done. This does not increase parallelism, and merely disables any outer event loop.

如果你想嵌套另一个 asyncio 任务,直接在当前事件循环中运行它.如果你想运行一个非合作的阻塞任务,在事件循环执行器中运行它.

If you want to nest another asyncio task, directly run it in the current event loop. If you want to run a non-cooperative, blocking task, run it in the event loop executor.

async def start_calculate():
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, calculate_data)

默认执行器使用线程——这允许运行阻塞任务,但不会增加并行度.使用自定义 ProcessPoolExecutor 来使用额外的内核:

The default executor uses threads – this allows running blocking tasks, but does not increase parallelism. Use a custom ProcessPoolExecutor to use additional cores:

import concurrent.futures

async def start_calculate():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        await loop.run_in_executor(pool, calculate_data)

这篇关于是否可以在python中同时运行多个asyncio?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆