python asyncio,如何从另一个线程创建和取消任务 [英] python asyncio, how to create and cancel tasks from another thread
问题描述
我有一个python多线程应用程序.我想在一个线程中运行一个asyncio循环,并从另一个线程向其发布回调和协程.应该很容易,但是我无法理解 asyncio 的内容.
I have a python multi-threaded application. I want to run an asyncio loop in a thread and post calbacks and coroutines to it from another thread. Should be easy but I cannot get my head around the asyncio stuff.
我想到了以下解决方案,该解决方案可以完成我想要的一半,请随时对任何内容发表评论:
I came up to the following solution which does half of what I want, feel free to comment on anything:
import asyncio
from threading import Thread
class B(Thread):
def __init__(self):
Thread.__init__(self)
self.loop = None
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop) #why do I need that??
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def add_task(self, coro):
"""this method should return a task object, that I
can cancel, not a handle"""
f = functools.partial(self.loop.create_task, coro)
return self.loop.call_soon_threadsafe(f)
def cancel_task(self, xx):
#no idea
@asyncio.coroutine
def test():
while True:
print("running")
yield from asyncio.sleep(1)
b.start()
time.sleep(1) #need to wait for loop to start
t = b.add_task(test())
time.sleep(10)
#here the program runs fine but how can I cancel the task?
b.stop()
因此,开始和停止循环工作正常.我曾考虑过使用create_task创建任务,但是该方法不是线程安全的,因此我将其包装在call_soon_threadsafe中.但是我希望能够获得任务对象以便能够取消任务.我可以使用Future和Condition做一个复杂的事情,但是必须有一个简单的方法,不是吗?
So starting and stoping the loop works fine. I thought about creating task using create_task, but that method is not threadsafe so I wrapped it in call_soon_threadsafe. But I would like to be able to get the task object in order to be able to cancel the task. I could do a complicated stuff using Future and Condition, but there must be a simplier way, isnt'it?
推荐答案
我认为您可能需要使add_task
方法了解是否从事件循环以外的线程中调用了它.这样,如果从同一线程中调用它,则可以直接调用asyncio.async
,否则,它可以做一些额外的工作来将任务从循环线程传递到调用线程.这是一个示例:
I think you may need to make your add_task
method aware of whether or not its being called from a thread other than the event loop's. That way, if it's being called from the same thread, you can just call asyncio.async
directly, otherwise, it can do some extra work to pass the task from the loop's thread to the calling thread. Here's an example:
import time
import asyncio
import functools
from threading import Thread, current_thread, Event
from concurrent.futures import Future
class B(Thread):
def __init__(self, start_event):
Thread.__init__(self)
self.loop = None
self.tid = None
self.event = start_event
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.tid = current_thread()
self.loop.call_soon(self.event.set)
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def add_task(self, coro):
"""this method should return a task object, that I
can cancel, not a handle"""
def _async_add(func, fut):
try:
ret = func()
fut.set_result(ret)
except Exception as e:
fut.set_exception(e)
f = functools.partial(asyncio.async, coro, loop=self.loop)
if current_thread() == self.tid:
return f() # We can call directly if we're not going between threads.
else:
# We're in a non-event loop thread so we use a Future
# to get the task from the event loop thread once
# it's ready.
fut = Future()
self.loop.call_soon_threadsafe(_async_add, f, fut)
return fut.result()
def cancel_task(self, task):
self.loop.call_soon_threadsafe(task.cancel)
@asyncio.coroutine
def test():
while True:
print("running")
yield from asyncio.sleep(1)
event = Event()
b = B(event)
b.start()
event.wait() # Let the loop's thread signal us, rather than sleeping
t = b.add_task(test()) # This is a real task
time.sleep(10)
b.stop()
首先,我们将事件循环的线程ID保存在run
方法中,这样我们便可以弄清楚以后对add_task
的调用是否来自其他线程.如果从非事件循环线程中调用了add_task
,我们将使用call_soon_threadsafe
调用一个将调度协程的函数,然后使用concurrent.futures.Future
将任务传递回调用线程,该线程将等待Future
的结果.
First, we save the thread id of the event loop in the run
method, so we can figure out if calls to add_task
are coming from other threads later. If add_task
is called from a non-event loop thread, we use call_soon_threadsafe
to call a function that will both schedule the coroutine, and then use a concurrent.futures.Future
to pass the task back to the calling thread, which waits on the result of the Future
.
有关取消任务的注意事项:您在Task
上调用cancel
时,下次事件循环运行时,协程中将出现CancelledError
.这意味着,任务包装的协程将在下一次到达屈服点时因异常而中止-除非协程捕获到CancelledError
并防止其自身中止.另请注意,这仅在要包装的函数实际上是可中断的协程时才有效;例如,由BaseEventLoop.run_in_executor
返回的asyncio.Future
并不能真正被取消,因为它实际上包裹在concurrent.futures.Future
周围,并且一旦其基础函数实际开始执行,就无法将其取消.在这种情况下,asyncio.Future
会说它已取消,但是在执行程序中实际运行的功能将继续运行.
A note on cancelling a task: You when you call cancel
on a Task
, a CancelledError
will be raised in the coroutine the next time the event loop runs. This means that the coroutine that the Task is wrapping will aborted due to the exception the next time it hit a yield point - unless the coroutine catches the CancelledError
and prevents itself from aborting. Also note that this only works if the function being wrapped is actually an interruptible coroutine; an asyncio.Future
returned by BaseEventLoop.run_in_executor
, for example, can't really be cancelled, because it's actually wrapped around a concurrent.futures.Future
, and those can't be cancelled once their underlying function actually starts executing. In those cases, the asyncio.Future
will say its cancelled, but the function actually running in the executor will continue to run.
根据安德鲁·斯维特洛夫(Andrew Svetlov)的建议,将第一个示例更新为使用concurrent.futures.Future
而不是queue.Queue
.
Updated the first example to use concurrent.futures.Future
, instead of a queue.Queue
, per Andrew Svetlov's suggestion.
注意: asyncio.async
已弃用,因为版本3.4.4使用 asyncio.ensure_future
代替.
Note: asyncio.async
is deprecated since version 3.4.4 use asyncio.ensure_future
instead.
这篇关于python asyncio,如何从另一个线程创建和取消任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!