python asyncio,如何从另一个线程创建和取消任务 [英] python asyncio, how to create and cancel tasks from another thread

查看:304
本文介绍了python asyncio,如何从另一个线程创建和取消任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个python多线程应用程序.我想在一个线程中运行一个asyncio循环,并从另一个线程向其发布回调和协程.应该很容易,但是我无法理解 asyncio 的内容.

I have a python multi-threaded application. I want to run an asyncio loop in a thread and post calbacks and coroutines to it from another thread. Should be easy but I cannot get my head around the asyncio stuff.

我想到了以下解决方案,该解决方案可以完成我想要的一半,请随时对任何内容发表评论:

I came up to the following solution which does half of what I want, feel free to comment on anything:

import asyncio
from threading import Thread

class B(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.loop = None

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop) #why do I need that??
        self.loop.run_forever()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)

    def add_task(self, coro):
        """this method should return a task object, that I
          can cancel, not a handle"""
        f = functools.partial(self.loop.create_task, coro)
        return self.loop.call_soon_threadsafe(f)

    def cancel_task(self, xx):
        #no idea

@asyncio.coroutine
def test():
    while True:
        print("running")
        yield from asyncio.sleep(1)

b.start()
time.sleep(1) #need to wait for loop to start
t = b.add_task(test())
time.sleep(10)
#here the program runs fine but how can I cancel the task?

b.stop()

因此,开始和停止循环工作正常.我曾考虑过使用create_task创建任务,但是该方法不是线程安全的,因此我将其包装在call_soon_threadsafe中.但是我希望能够获得任务对象以便能够取消任务.我可以使用Future和Condition做一个复杂的事情,但是必须有一个简单的方法,不是吗?

So starting and stoping the loop works fine. I thought about creating task using create_task, but that method is not threadsafe so I wrapped it in call_soon_threadsafe. But I would like to be able to get the task object in order to be able to cancel the task. I could do a complicated stuff using Future and Condition, but there must be a simplier way, isnt'it?

推荐答案

我认为您可能需要使add_task方法了解是否从事件循环以外的线程中调用了它.这样,如果从同一线程中调用它,则可以直接调用asyncio.async,否则,它可以做一些额外的工作来将任务从循环线程传递到调用线程.这是一个示例:

I think you may need to make your add_task method aware of whether or not its being called from a thread other than the event loop's. That way, if it's being called from the same thread, you can just call asyncio.async directly, otherwise, it can do some extra work to pass the task from the loop's thread to the calling thread. Here's an example:

import time
import asyncio
import functools
from threading import Thread, current_thread, Event
from concurrent.futures import Future

class B(Thread):
    def __init__(self, start_event):
        Thread.__init__(self)
        self.loop = None
        self.tid = None
        self.event = start_event

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.tid = current_thread()
        self.loop.call_soon(self.event.set)
        self.loop.run_forever()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)

    def add_task(self, coro):
        """this method should return a task object, that I
          can cancel, not a handle"""
        def _async_add(func, fut):
            try:
                ret = func()
                fut.set_result(ret)
            except Exception as e:
                fut.set_exception(e)

        f = functools.partial(asyncio.async, coro, loop=self.loop)
        if current_thread() == self.tid:
            return f() # We can call directly if we're not going between threads.
        else:
            # We're in a non-event loop thread so we use a Future
            # to get the task from the event loop thread once
            # it's ready.
            fut = Future()
            self.loop.call_soon_threadsafe(_async_add, f, fut)
            return fut.result()

    def cancel_task(self, task):
        self.loop.call_soon_threadsafe(task.cancel)


@asyncio.coroutine
def test():
    while True:
        print("running")
        yield from asyncio.sleep(1)

event = Event()
b = B(event)
b.start()
event.wait() # Let the loop's thread signal us, rather than sleeping
t = b.add_task(test()) # This is a real task
time.sleep(10)
b.stop()

首先,我们将事件循环的线程ID保存在run方法中,这样我们便可以弄清楚以后对add_task的调用是否来自其他线程.如果从非事件循环线程中调用了add_task,我们将使用call_soon_threadsafe调用一个将调度协程的函数,然后使用concurrent.futures.Future将任务传递回调用线程,该线程将等待Future的结果.

First, we save the thread id of the event loop in the run method, so we can figure out if calls to add_task are coming from other threads later. If add_task is called from a non-event loop thread, we use call_soon_threadsafe to call a function that will both schedule the coroutine, and then use a concurrent.futures.Future to pass the task back to the calling thread, which waits on the result of the Future.

有关取消任务的注意事项:您在Task上调用cancel时,下次事件循环运行时,协程中将出现CancelledError.这意味着,任务包装的协程将在下一次到达屈服点时因异常而中止-除非协程捕获到CancelledError并防止其自身中止.另请注意,这仅在要包装的函数实际上是可中断的协程时才有效;例如,由BaseEventLoop.run_in_executor返回的asyncio.Future并不能真正被取消,因为它实际上包裹在concurrent.futures.Future周围,并且一旦其基础函数实际开始执行,就无法将其取消.在这种情况下,asyncio.Future会说它已取消,但是在执行程序中实际运行的功能将继续运行.

A note on cancelling a task: You when you call cancel on a Task, a CancelledError will be raised in the coroutine the next time the event loop runs. This means that the coroutine that the Task is wrapping will aborted due to the exception the next time it hit a yield point - unless the coroutine catches the CancelledError and prevents itself from aborting. Also note that this only works if the function being wrapped is actually an interruptible coroutine; an asyncio.Future returned by BaseEventLoop.run_in_executor, for example, can't really be cancelled, because it's actually wrapped around a concurrent.futures.Future, and those can't be cancelled once their underlying function actually starts executing. In those cases, the asyncio.Future will say its cancelled, but the function actually running in the executor will continue to run.

根据安德鲁·斯维特洛夫(Andrew Svetlov)的建议,将第一个示例更新为使用concurrent.futures.Future而不是queue.Queue.

Updated the first example to use concurrent.futures.Future, instead of a queue.Queue, per Andrew Svetlov's suggestion.

注意: asyncio.async 已弃用,因为版本3.4.4使用 asyncio.ensure_future 代替.

Note: asyncio.async is deprecated since version 3.4.4 use asyncio.ensure_future instead.

这篇关于python asyncio,如何从另一个线程创建和取消任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆