Python 3:如何将异步函数提交给threadPool? [英] Python 3: How to submit an async function to a threadPool?

查看:78
本文介绍了Python 3:如何将异步函数提交给threadPool?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想同时使用来自 concurrent.futures ThreadPoolExecutor 和异步函数.

I want to use both ThreadPoolExecutor from concurrent.futures and async functions.

我的程序反复向线程池提交具有不同输入值的函数.在更大的函数中执行的任务的最终顺序可以是任意顺序,我不在乎返回值,只是它们会在将来的某个时刻执行.

My program repeatedly submits a function with different input values to a thread pool. The final sequence of tasks that are executed in that larger function can be in any order, and I don't care about the return value, just that they execute at some point in the future.

所以我试图做到这一点

async def startLoop():

    while 1:
        for item in clients:
            arrayOfFutures.append(await config.threadPool.submit(threadWork, obj))

        wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)

提交的函数为:

async def threadWork(obj):
   bool = do_something() # needs to execute before next functions
   if bool:
       do_a() # can be executed at any time
       do_b() # ^

其中 do_b do_a 是异步函数.问题是我得到了错误: TypeError:object Future不能在'await'表达式,如果我删除了await,则会收到另一个错误消息,提示我需要添加 await .

where do_b and do_a are async functions.The problem with this is that I get the error: TypeError: object Future can't be used in 'await' expression and if I remove the await, I get another error saying I need to add await.

我想我可以使所有东西都使用线程,但是我真的不想这么做.

I guess I could make everything use threads, but I don't really want to do that.

推荐答案

我建议仔细阅读Python 3的 asyncio开发指南,尤其是并发和多线程"部分.

I recommend a careful readthrough of Python 3's asyncio development guide, particularly the "Concurrency and Multithreading" section.

示例中的主要概念问题是事件循环是单线程的,因此在线程池中执行异步协程没有任何意义.事件循环和线程进行交互的方式有几种:

The main conceptual issue in your example that event loops are single-threaded, so it doesn't make sense to execute an async coroutine in a thread pool. There are a few ways for event loops and threads to interact:

  • 每个线程的事件循环.例如:

  • Event loop per thread. For example:

 async def threadWorkAsync(obj):
     b = do_something()
     if b:
         # Run a and b as concurrent tasks
         task_a = asyncio.create_task(do_a())
         task_b = asyncio.create_task(do_b())
         await task_a
         await task_b

 def threadWork(obj):
     # Create run loop for this thread and block until completion
     asyncio.run(threadWorkAsync())

 def startLoop():
     while 1:
         arrayOfFutures = []
         for item in clients:
             arrayOfFutures.append(config.threadPool.submit(threadWork, item))

         wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)

  • 在执行程序中执行阻止代码.这样,您就可以使用异步期货代替上述的并发期货.

  • Execute blocking code in an executor. This allows you to use async futures instead of concurrent futures as above.

     async def startLoop():
         while 1:
             arrayOfFutures = []
             for item in clients:
                 arrayOfFutures.append(asyncio.run_in_executor(
                     config.threadPool, threadWork, item))
    
             await asyncio.gather(*arrayOfFutures)
    

  • 使用线程安全功能将任务提交到线程之间的事件循环.例如,您可以在主线程的运行循环中运行所有异步协程,而不必为每个线程创建一个运行循环:

  • Use threadsafe functions to submit tasks to event loops across threads. For example, instead of creating a run loop for each thread you could run all async coroutines in the main thread's run loop:

     def threadWork(obj, loop):
         b = do_something()
         if b:
             future_a = asyncio.run_coroutine_threadsafe(do_a(), loop)
             future_b = asyncio.run_coroutine_threadsafe(do_b(), loop)
             concurrent.futures.wait([future_a, future_b])
    
     async def startLoop():
         loop = asyncio.get_running_loop()
         while 1:
             arrayOfFutures = []
             for item in clients:
                 arrayOfFutures.append(asyncio.run_in_executor(
                     config.threadPool, threadWork, item, loop))
    
             await asyncio.gather(*arrayOfFutures)
    

    注意:此示例不应从字面上使用,因为它会导致所有协程在主线程中执行,而线程池工作程序只是阻塞.这只是为了展示 run_coroutine_threadsafe()方法的示例.

    Note: This example should not be used literally as it will result in all coroutines executing in the main thread while the thread pool workers just block. This is just to show an example of the run_coroutine_threadsafe() method.

    这篇关于Python 3:如何将异步函数提交给threadPool?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆