将线程与 asyncio 结合使用 [英] Using threads in combination with asyncio

查看:87
本文介绍了将线程与 asyncio 结合使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在寻找一种方法来生成不同的线程(在我的实际程序中,线程的数量可以在执行过程中改变)来执行一个无限运行的操作,这会阻塞我的整个应用程序(最坏的情况下)几秒钟他们的奔跑.
因此,我使用标准线程类和 asyncio(因为我的程序的其他部分正在使用它).

I was looking for a way to spawn different threads (in my actual program the number of threads can change during execution) to perform a endless-running operation which would block my whole application for (at worst) a couple of seconds during their run.
Because of this, I'm using the standard thread class and asyncio (because other parts of my program are using it).

这似乎很好并且根据这个线程这似乎没问题,但是在搜索异步 threadingasyncio 时,我经常遇到使用 ProcessPoolExecutor 的建议(例如在 this stackoverflow 帖子).现在我想知道,以下方式是否真的是好的做法(甚至危险)?

This seems to work good and according to this thread it seems to be okay, however when searching for asynchronous threading and asyncio I'm often stumbling across the suggestion of using ProcessPoolExecutor (e. g. in this stackoverflow post). Now I'm wondering, if the following way is really good practice (or even dangerous)?

class Scanner:
  def __init__(self):
    # Start a new Scanning Thread
    self.scan_thread = Thread(target=self.doScan, args=())
    self.scan_thread.start()

  def doScan(self):
    print("Started scanning")
    loop = asyncio.new_event_loop()
    loop.run_until_complete(self.connection())
    print("Stopped scanning")

list_of_scanner = []
list_of_scanner.append(Scanner())
list_of_scanner.append(Scanner())

背景:我自己开始质疑这个,因为我的程序在生成线程时开始崩溃,主要是错误消息<代码>运行时错误:任务<任务挂起...>附加到不同的循环.我知道这与我给你的例子没有直接联系,但我想我开始通过使用这些线程来搞乱我的 asyncio 协程.

Background: I started questioning this myself, because my program started crashing when spawning threads, mostly with the error message RuntimeError: Task <Task pending ...> attached to a different loop. I know that this is not directly linked to the example I gave you, but I guess I started messing up my asyncio coroutines by using these threads.

编辑

为了澄清我想补充一点,为什么我要使用这种奇怪的 asynciothreads 构造.

For clarification I want to add, why I'm using this weird construct of asyncio and threads.

  1. 我正在使用项目的这一部分hbldh/bleak
    作为线程运行的部分基本上是这样的:
  1. I'm using this parts of the project hbldh/bleak
    The part which would run as a thread is basically this:
async def connection():
  x = await client.is_connected()
  async with BleakClient(address, loop=loop) as client:
    while x:
      x = await client.is_connected()
      log.info("Connected: {0}".format(x))

  • endlessScan() 在做什么?这个名字有点误导,它在我的代码中被称为不同(我现在已经改变了).新名称是 connection()
    整个目的是建立到蓝牙设备的链接并基本上侦听传入的数据(就像我们在使用套接字时所做的那样)这意味着 loop.run_until_complete(self.connection())NEVER退出,除非蓝牙设备断开连接.
  • 为什么我不能创建一个单独的事件循环?
    如前所述,当建立链接时,此功能会无休止地运行.每个连接的设备都运行着这样一个无限循环.我想在后台执行此操作.我的主应用程序永远不必等待例程完成,并且在所有情况下都必须响应.这对我来说证明 threadsasyncio
  • 结合使用是合理的

  • What is endlessScan() doing? The name is a bit misleading and it's called different in my code (I've now changed that now). The new name is connection()
    The whole purpose is to establish a link to Bluetooth Devices and basically listen to incoming data (like we would do when using sockets) This means that loop.run_until_complete(self.connection()) will NEVER exit, unless the Bluetooth devices disconnects.
  • Why can't I create one single event loop?
    As said, when established a link, this function runs endlessly. Each connected device runs such an endless loop. I want to do this in background. My main application should never have to wait for the routine to finish and must be responsive under all circumstances. This for me justified the usage of threads in combination with asyncio

  • Edit 2:根据@user4815162342 建议添加了我的测试代码.执行似乎工作正常.


    Edit 2: Added my testing code based on @user4815162342 suggestion. The execution seems to work fine.

        import asyncio
        from threading import Thread, Event, Lock
        import random
        class Scanner:
            def __init__(self, id, loop):
                print("INIT'D %s" % id)
                self.id = id
                self.submit_async(self.update_raw_data(), loop)
                self.raw_data = ""
                self.event = Event()
                self.data_lock = Lock()
        
            @property
            def raw_data(self):
                with self.data_lock:
                    return self._raw_data
            @raw_data.setter
            def raw_data(self, raw_data):
                self._raw_data = raw_data
        
            def submit_async(self, awaitable, loop):
                return asyncio.run_coroutine_threadsafe(awaitable, loop)
        
            async def update_raw_data(self):
                while True:
                    with self.data_lock:
                        self._raw_data = random.random()
                        print("Waken up %s with %s" % (self.id, self._raw_data))
                    await asyncio.sleep(self.id)
        
        def _start_async():
            loop = asyncio.new_event_loop()
             t = Thread(target=loop.run_forever)
             t.daemon = True
             t.start()
            return loop
        _loop = _start_async()
        
        def stop_async():
            _loop.call_soon_threadsafe(_loop.stop)
        
        ble_devices = [Scanner(1, _loop), Scanner(2, _loop), Scanner(4, _loop)]
        
        # This code never executes...
        for dev in ble_devices:
            print(dev.raw_data)
    
    

    推荐答案

    我建议在后台线程中创建单个事件循环,并让它满足您所有的异步需求.你的协程永不结束并不重要;asyncio 完全有能力并行执行多个这样的函数.

    I would recommend creating a single event loop in a background thread and have it service all your async needs. It doesn't matter that your coroutines never end; asyncio is perfectly capable of executing multiple such functions in parallel.

    例如:

    def _start_async():
        loop = asyncio.new_event_loop()
        threading.Thread(target=loop.run_forever).start()
        return loop
    
    _loop = start_async()
    
    # Submits awaitable to the event loop, but *doesn't* wait for it to
    # complete. Returns a concurrent.futures.Future which *may* be used to
    # wait for and retrieve the result (or exception, if one was raised)
    def submit_async(awaitable):
        return asyncio.run_coroutine_threadsafe(awaitable, _loop)
    
    def stop_async():
        _loop.call_soon_threadsafe(_loop.stop)
    

    有了这些工具(可能在一个单独的模块中),你可以做这样的事情:

    With these tools in place (and possibly in a separate module), you can do things like this:

    class Scanner:
        def __init__(self):
            submit_async(self.connection())
            # ...
    
        # ...
    

    • 使用 ProcessPoolExecutor 的建议如何?
      • What about the advice to use ProcessPoolExecutor?
      • 那些适用于在并行进程中运行 CPU 绑定代码以避免 GIL.如果您实际上正在运行异步代码,则不必关心 ProcessPoolExecutor.

        Those apply to running CPU-bound code in parallel processes to avoid the GIL. If you are actually running async code, you shouldn't care about ProcessPoolExecutor.

        • 使用 ThreadPoolExecutor 的建议如何?
        • What about the advice to use ThreadPoolExecutor?

        ThreadPoolExecutor 只是一个对经典多线程应用程序有用的线程池.在 Python 中,它主要用于使程序响应更快,而不是使其更快.它允许您与交互式代码并行运行受 CPU 限制的或阻塞的代码,而不会饿死.由于 GIL,它不会使事情更快.

        A ThreadPoolExecutor is simply a thread pool useful for classic multi-threaded applications. In Python it is used primarily to make the program more responsive, not to make it faster. It allows you to run CPU-bound or blocking code in parallel with interactive code with neither getting starved. It won't make things faster due to the GIL.

        这篇关于将线程与 asyncio 结合使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆