如何将自定义任务集成到 aiogram 执行器中? [英] How to integrate custom task into aiogram executor?

查看:26
本文介绍了如何将自定义任务集成到 aiogram 执行器中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

import asyncio
from threading import Thread
from datetime import datetime
from aiogram import Bot, Dispatcher, executor, types

API_TOKEN = ''

bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)

chat_ids = {}

@dp.message_handler()
async def echo(message: types.Message):
    # old style:
    # await bot.send_message(message.chat.id, message.text)

    chat_ids[message.message_id] = message.from_user
    text = f'{message.message_id} {message.from_user} {message.text}'
    await message.reply(text, reply=False)


async def periodic(sleep_for, queue):
    while True:
        await asyncio.sleep(sleep_for)
        now = datetime.utcnow()
        print(f"{now}")
        for id in chat_ids:
            queue.put_nowait((id, f"{now}"))
            # await bot.send_message(id, f"{now}", disable_notification=True)


def run_tick(queue):
    newloop = asyncio.new_event_loop()
    asyncio.set_event_loop(newloop)
    asyncio.run(periodic(3, queue))


if __name__ == '__main__':
    queue = asyncio.Queue()
    Thread(target=run_tick, args=(queue,), daemon=True).start()
    executor.start_polling(dp, skip_updates=True)

我想在有事件但暂时失败时通过 bot.send_message 向注册用户发送消息.这是我尝试过的.

I want to send messages to registered users by bot.send_message when there is a event but failed for now. Here are what I tried.

  1. bot.send_message 崩溃,因为它是从另一个线程调用的.(超时上下文管理器应该在任务中使用)
  2. 因此,我尝试使用队列来解决此问题,但无法将我自己的任务添加到执行程序中.

有没有什么简单的方法可以做到这一点?

Is there any simple way to do this?

2020-1-3

这是@user4815162342 的工作示例.

Here's working example as per @user4815162342.

import asyncio
from datetime import datetime
from aiogram import Bot, Dispatcher, executor, types

API_TOKEN = ''

bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)

chat_ids = {}

@dp.message_handler()
async def echo(message: types.Message):
    chat_ids[message.from_user.id] = message.from_user
    text = f'{message.message_id} {message.from_user} {message.text}'
    await message.reply(text, reply=False)

async def periodic(sleep_for):
    while True:
        await asyncio.sleep(sleep_for)
        now = datetime.utcnow()
        print(f"{now}")
        for id in chat_ids:
            await bot.send_message(id, f"{now}", disable_notification=True)

if __name__ == '__main__':
    dp.loop.create_task(periodic(10))
    executor.start_polling(dp)

推荐答案

最初的问题是您试图从不同的线程调用 asyncio 代码.为了修复由此产生的错误,您创建了一个新的事件循环,同时保留了额外的线程.俗话说,现在你有两个问题.

The initial problem was that you tried to call asyncio code from a different thread. To fix the resulting error you have created a new event loop while keeping the additional thread. As the saying goes, now you have two problems.

队列的想法看起来还没有完成,因为没有从队列中读取的代码;即使有,它也不会工作,因为 asyncio 队列不是为了在事件循环或线程之间共享而设计的.为了解决这个问题,您需要找到一种方法从事件循环中运行定期更新,即重新检查这个假设:

The queue idea looks unfinished because there is no code that reads from the queue; and even if there were, it wouldn't work because asyncio queues are not designed to be shared between event loops or between threads. To untangle the mess, you need to find a way to run your periodic updates from within the event loop, i.e. re-examine this assumption:

但是没有办法将我自己的任务添加到执行器中.

but there is no way to add my own task into executor.

查看来源Executor,它似乎从调度程序中获取事件循环,调度程序将其保存在可公开访问的 loop 属性中.这意味着您只需调用 <该循环上的 code>create_task 方法.例如:

Looking at the source of Executor, it appears to pick up the event loop from the dispatcher, which holds it in the publicly accessible loop attribute. That means that you can create a task simply by invoking the create_task method on that loop. For example:

if __name__ == '__main__':
    dp.loop.create_task(periodic())
    executor.start_polling(dp, skip_updates=True)

现在 periodic 可以在您最初的尝试中制定:

Now periodic can be formulated as in your initial attempt:

async def periodic(sleep_for, queue):
    while True:
        await asyncio.sleep(sleep_for)
        now = datetime.utcnow()
        for id in chat_ids:
            await bot.send_message(id, f"{now}",
                                   disable_notification=True)

请注意,我没有对此进行测试,因为我不使用 aiogram.您可能需要解决的一个潜在问题是您的 chat_ids dict 似乎包含 message.message_id 作为键,而 bot.send_message 接受一个message.chat.id.

Please note that I haven't tested this because I do not use aiogram. A potential issue that you might need to address is that your chat_ids dict appears to contain message.message_id as key, whereas bot.send_message accepts a message.chat.id.

这篇关于如何将自定义任务集成到 aiogram 执行器中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆