在 PyQt 应用程序中嵌入 aiohttp 服务器 [英] embed a aiohttp server in a PyQt application

查看:63
本文介绍了在 PyQt 应用程序中嵌入 aiohttp 服务器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我打算在 PyQt 应用程序中嵌入一个 aiohttp 服务器,但是当我运行下面的代码时,Qt 窗口无法显示,我知道这是由 web.run_app(app),我试图将它移动到一个线程中,但是我得到了 RuntimeError: There is no current event loop in thread 'Dummy-1',那么我该怎么办?我发现 asyncqt 这可能会有所帮助,但我不知道如何使用它来处理aiohttp 服务器.

I am going to embed a aiohttp server in a PyQt application, but when I run the code below , the Qt window couldn't show, I know it was caused by web.run_app(app), I've tried to move it into a thread , but then I got RuntimeError: There is no current event loop in thread 'Dummy-1', so what should I do ? I've found asyncqt which might help ,but I don't know how to use it to deal with a aiohttp server.

from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
from aiohttp import web


class ThreadGo(QThread):  # threading.Thread
    # implementing new slots in a QThread subclass is error-prone and discouraged.

    def __init__(self, parent, func, *args, **kwargs):
        super().__init__(parent)
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.result = 0

        onFinished = self.kwargs.get('onFinished')
        self.finished.connect(onFinished) if onFinished else None  # 用lambda还不行呢

        self.finished.connect(self.deleteLater)
        self.start()

    def run(self):

        self.result = self.func(*self.args)  # deleteLater


class Window(QMainWindow):

    def __init__(self, parent=None, **kwargs):
        super().__init__(parent, **kwargs)
        self.setUpHTTPServer()

    def setUpHTTPServer(self):
        async def hello(request):
            return web.Response(text="Hello, world")
        app = web.Application()
        app.add_routes([web.get('/', hello)])
        web.run_app(app)
        # ThreadGo(self, lambda:web.run_app(app))#get RuntimeError: There is no current event loop in thread 'Dummy-1


if __name__ == "__main__":
    from sys import argv, exit

    a = QApplication(argv)
    w = Window()
    w.show()
    exit(a.exec_())

推荐答案

在下面的例子中,我展示了如何在 aiohttp 服务器上使用 Qt:

In the following example I show how to use Qt with the aiohttp server:

import asyncio
from functools import cached_property

from PyQt5.QtWidgets import QApplication, QMainWindow

from asyncqt import QEventLoop

from aiohttp import web


class Window(QMainWindow):
    def __init__(self, parent=None, **kwargs):
        super().__init__(parent, **kwargs)
        self.setup_server()

    def setup_server(self):
        self.app.add_routes([web.get("/", self.hello)])

    @cached_property
    def app(self):
        return web.Application()

    async def hello(self, request):
        return web.Response(text="Hello, world")

    def run(self):
        web.run_app(self.app)


def main():
    import sys

    a = QApplication(sys.argv)

    loop = QEventLoop(a)
    asyncio.set_event_loop(loop)
    w = Window()
    w.show()

    w.run()


if __name__ == "__main__":
    main()

这篇关于在 PyQt 应用程序中嵌入 aiohttp 服务器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆