Python Asyncio run_forever() 和任务 [英] Python Asyncio run_forever() and Tasks

查看:27
本文介绍了Python Asyncio run_forever() 和任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我修改了此代码以在异步 Python 中使用 Google Cloud PubSub:https://github.com/cloudfind/google-pubsub-asyncio

I adapted this code for using Google Cloud PubSub in Async Python: https://github.com/cloudfind/google-pubsub-asyncio

import asyncio
import datetime
import functools
import os

from google.cloud import pubsub
from google.gax.errors import RetryError
from grpc import StatusCode

async def message_producer():
    """ Publish messages which consist of the current datetime """
    while True:
        await asyncio.sleep(0.1)


async def proc_message(message):
    await asyncio.sleep(0.1)
    print(message)
    message.ack()


def main():
    """ Main program """
    loop = asyncio.get_event_loop()

    topic = "projects/{project_id}/topics/{topic}".format(
        project_id=PROJECT, topic=TOPIC)
    subscription_name = "projects/{project_id}/subscriptions/{subscription}".format(
        project_id=PROJECT, subscription=SUBSCRIPTION)

    subscription = make_subscription(
        topic, subscription_name)

    def create_proc_message_task(message):
        """ Callback handler for the subscription; schedule a task on the event loop """
        print("Task created!")
        task = loop.create_task(proc_message(message))

    subscription.open(create_proc_message_task)
    # Produce some messages to consume

    loop.create_task(message_producer())

    print("Subscribed, let's do this!")
    loop.run_forever()


def make_subscription(topic, subscription_name):
    """ Make a publisher and subscriber client, and create the necessary resources """
    subscriber = pubsub.SubscriberClient()
    try:
        subscriber.create_subscription(subscription_name, topic)
    except:
        pass
    subscription = subscriber.subscribe(subscription_name)

    return subscription


if __name__ == "__main__":
    main()

我基本去掉了发布代码,只使用订阅代码.但是,最初我没有包含 loop.create_task(message_producer()) 行.我认为任务是按预期创建的,但是它们实际上从未运行过.仅当我添加所述行时,代码才能正确执行并且所有创建的任务都会运行.是什么导致了这种行为?

I basically removed the publishing code and only use the subscription code. However, initially I did not include the loop.create_task(message_producer()) line. I figured that tasks were created as they were supposed to however they never actually run themselves. Only if I add said line the code properly executes and all created Tasks run. What causes this behaviour?

推荐答案

PubSub 正在从不同的线程调用 create_proc_message_task 回调.由于 create_task不是线程安全的,它只能从运行事件循环的线程(通常是主线程)调用.要解决此问题,请将 loop.create_task(proc_message(message)) 替换为 asyncio.run_coroutine_threadsafe(proc_message(message), loop)message_producer将不再需要.

PubSub is calling the create_proc_message_task callback from a different thread. Since create_task is not thread-safe, it must only be called from the thread that runs the event loop (typically the main thread). To correct the issue, replace loop.create_task(proc_message(message)) with asyncio.run_coroutine_threadsafe(proc_message(message), loop) and message_producer will no longer be needed.

至于为什么 message_producer 似乎修复了代码,请考虑 run_coroutine_threadsafecreate_task 相比还做了两件事:

As for why message_producer appeared to fix the code, consider that run_coroutine_threadsafe does two additional things compared to create_task:

  • 它以线程安全的方式运行,因此在并发执行时不会损坏事件循环数据结构.
  • 它确保事件循环在尽可能快的机会唤醒,以便它可以处理新任务.

在您的情况下,create_task 将任务添加到循环的可运行队列(没有任何锁定),但无法确保唤醒,因为在事件循环线程中运行时不需要.message_producer 然后用于强制循环定期唤醒,这也是它检查和执行可运行任务的时间.

In your case create_task added the task to the loop's runnable queue (without any locking), but failed to ensure the wakeup, because that is not needed when running in the event loop thread. The message_producer then served to force the loop to wake up in regular intervals, which is when it also checks and executes the runnable tasks.

这篇关于Python Asyncio run_forever() 和任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆