如何将第三方库中的函数转换为异步? [英] How to convert a function in a third party library to be async?

查看:77
本文介绍了如何将第三方库中的函数转换为异步?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Raspberry Pi和 pigpio websockets 库.

我希望我的程序异步运行(即,我将使用 async def main 作为入口点).

pigpio 库需要同步的回调函数可以响应事件而被调用,这很好,但是从该回调中,我想从 websocket 库中调用另一个 asynchronous 函数.

因此它看起来像:

  def sync_cb():#<-无法将其设为异步,因此我无法使用await[connected_ws中ws的ws.send('test')]#<-这是异步的,必须等待 

目前,我可以使用它:

  def sync_cb():asyncio.run(asyncio.wait([connected_ws中ws的[ws.send('test')])) 

文档表示这种用法不鼓励 asyncio.run .

因此,我的同步回调需要调用 ws.send (也可以从解决方案

是否可以在需要同步回调的地方替换异步函数

有可能.您可以在单独的线程中运行事件循环,并在其中发出 async 代码,但是您必须考虑使用GIL.

 导入异步导入线程类门户:def __init __(self,stop_event):self.loop = asyncio.get_event_loop()self.stop_event =停止事件异步def _call(self,fn,args,kwargs):返回等待fn(* args,** kwargs)异步def _stop():self.stop_event.set()def呼叫(self,fn,* args,** kwargs):返回asyncio.run_coroutine_threadsafe(self._call(fn,args,kwargs),self.loop)def停止(自己):返回self.call(self._stop)def create_portal():门户=无异步def wait_stop():非本地门户stop_event = asyncio.Event()门户=门户(stop_event)running_event.set()等待stop_event.wait()def run():asyncio.run(wait_stop())running_event = threading.Event()线程= threading.Thread(目标=运行)thread.start()running_event.wait()返回门户 

用法示例:

 异步定义测试(msg):等待asyncio.sleep(0.5)打印(味精)返回"HELLO"+味精#它将在单独的线程中运行一个新的事件循环门户= create_portal()#它会在单独的线程中调用`test`并返回Futureprint(portal.call(test,"WORLD").result())portal.stop().result() 

在您的情况下:

  def sync_cb():调用= [connected_ws中ws的portal.call(ws.send,'test')]#如果您想从这些调用中获得结果:#[c.result()for call in c] 

而且,仅通过调用一个简单的异步方法,使用asyncio.run和asyncio.wait会产生什么样的开销

asyncio.run 将创建一个新的事件循环,然后将其关闭.如果不经常调用回调,则很可能不会有问题.但是,如果您还要在另一个回调中使用 asyncio.run ,则它们将无法同时工作.

I am using my Raspberry Pi and the pigpio and websockets libraries.

I want my program to run asynchronously (i.e. I will use async def main as the entry point).

The pigpio library expects a synchronous callback function to be called in response to events, which is fine, but from within that callback I want to call another, asynchronous function from the websocket library.

So it would look like:

def sync_cb(): # <- This can not be made async, therefore I can not use await
   [ws.send('test') for ws in connected_ws] # <- This is async and has to be awaited

Currently I can get it to work with:

def sync_cb():
    asyncio.run(asyncio.wait([ws.send('test') for ws in connected_ws]))

but the docs say this use of asyncio.run is discouraged.

So my synchronous callback needs to call ws.send (also from a third party library) which is async from a function that is synchronous.

Another option that works is:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(asyncio.gather(*[ws.send(json.dumps(message)) for ws in connected_ws]))

But the three lines of creating and setting an even loop sounds like a lot just to run a simple async function.

My questions are:

  • Is it possible to substitute an async function where a synchronous callback is required (i.e. is there a way to make cb async in this example)
  • And, what kind of overhead am I incurring by using asyncio.run and asyncio.wait just to call a simple async method (in the list comprehension)

解决方案

Is it possible to substitute an async function where a synchronous callback is required

It is possible. You can run event loop in separate thread and emit async code there, but you have to consider GIL.

import asyncio
import threading

class Portal:

    def __init__(self, stop_event):
        self.loop = asyncio.get_event_loop()
        self.stop_event = stop_event

    async def _call(self, fn, args, kwargs):
        return await fn(*args, **kwargs)

    async def _stop(self):
        self.stop_event.set()

    def call(self, fn, *args, **kwargs):
        return asyncio.run_coroutine_threadsafe(self._call(fn, args, kwargs), self.loop)

    def stop(self):
        return self.call(self._stop)

def create_portal():
    portal = None

    async def wait_stop():
        nonlocal portal
        stop_event = asyncio.Event()
        portal = Portal(stop_event)
        running_event.set()
        await stop_event.wait()

    def run():
        asyncio.run(wait_stop())

    running_event = threading.Event()
    thread = threading.Thread(target=run)
    thread.start()
    running_event.wait()

    return portal

Usage example:

async def test(msg):
    await asyncio.sleep(0.5)
    print(msg)
    return "HELLO " + msg

# it'll run a new event loop in separate thread
portal = create_portal()
# it'll call `test` in the separate thread and return a Future 
print(portal.call(test, "WORLD").result())
portal.stop().result()

In your case:

def sync_cb():
    calls = [portal.call(ws.send, 'test') for ws in connected_ws]
    # if you want to get results from these calls:
    # [c.result() for c in calls]

And, what kind of overhead am I incurring by using asyncio.run and asyncio.wait just to call a simple async method

asyncio.run will create a new event loop and close it then. Most likely if the callback is not called often it won't be a problem. But if you will use asyncio.run in another callback too, then they won't be able to work concurrently.

这篇关于如何将第三方库中的函数转换为异步?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆