将Autobahn | Python与aiohttp集成 [英] Integrate Autobahn|Python with aiohttp

查看:101
本文介绍了将Autobahn | Python与aiohttp集成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将 aiohttp Web服务器集成到Crossbar + Autobahn系统体系结构中。

I'm trying to integrate an aiohttp web server into a Crossbar+Autobahn system architecture.

更多详细来说,当 aiohttp 服务器收到某个API调用时,它必须将消息发布到Crossbar路由器。
我见过此示例关于官方仓库,但我不知道如何将其集成到我的应用程序中。

More in detail, when the aiohttp server receive a certain API call, it has to publish a message to a Crossbar router. I've seen this example on the official repos but i've no clue on how to integrate it on my application.

理想情况下,我希望能够为此

Ideally, i would like to be able to do this

# class SampleTaskController(object):
async def handle_get_request(self, request: web.Request) -> web.Response:
    self.publisher.publish('com.myapp.topic1', 'Hello World!')
    return web.HTTPOk()

其中 self SampleTaskController(object)定义了Web服务器的所有路由处理程序。

where self il an instance of SampleTaskController(object) which defines all the routes handler of the web server.

def main(argv):
    cfg_path = "./task_cfg.json"
    if len(argv) > 1:
        cfg_path = argv[0]

    logging.basicConfig(level=logging.DEBUG,
                        format=LOG_FORMAT)

    loop = zmq.asyncio.ZMQEventLoop()
    asyncio.set_event_loop(loop)

    app = web.Application(loop=loop)
    with open(cfg_path, 'r') as f:
        task_cfg = json.load(f)
        task_cfg['__cfg_path'] = cfg_path
        controller = SampleTaskController(task_cfg)
        controller.restore()
        app['controller'] = controller

        controller.setup_routes(app)

        app.on_startup.append(controller.on_startup)
        app.on_cleanup.append(controller.on_cleanup)
        web.run_app(app,
                    host=task_cfg['webserver_address'],
                    port=task_cfg['webserver_port'])

请注意,我正在使用 zmq.asyncio.ZMQEventLoop ,因为服务器也在监听 zmq 套接字,该套接字在 controller.on_startup 方法内部配置。

Notice that i'm using an zmq.asyncio.ZMQEventLoop because the server is also listening on a zmq socket, which is configured inside the controller.on_startup method.

高速公路,我还尝试使用 wampy 将消息发布到Crossbar,它可以工作,但是高速公路用户无法正确解析消息。

Instead of using autobahn, i've also tried to publish the message to Crossbar using wampy and it works, but the autobahn subscribers couldn't correctly parse the message.

# autobahn subscriber
class ClientSession(ApplicationSession):
    async def onJoin(self, details):

        self.log.info("Client session joined {details}", details=details)

        self.log.info("Connected:  {details}", details=details)

        self._ident = details.authid
        self._type = u'Python'

        self.log.info("Component ID is  {ident}", ident=self._ident)
        self.log.info("Component type is  {type}", type=self._type)

        # SUBSCRIBE

        def gen_on_something(thing):
            def on_something(counter, id, type):
                print('----------------------------')
                self.log.info("'on_{something}' event, counter value: {message}",something=thing, message=counter)
                self.log.info("from component {id} ({type})", id=id, type=type)
            return on_something

        await self.subscribe(gen_on_something('landscape'), 'landscape')
        await self.subscribe(gen_on_something('nature'), 'nature')

-

# wampy publisher
async def publish():
    router = Crossbar(config_path='./crossbar.json')
    logging.getLogger().debug(router.realm)
    logging.getLogger().debug(router.url)
    logging.getLogger().debug(router.port)

    client = Client(router=router)
    client.start()

    result = client.publish(topic="nature", message=0)
    logging.getLogger().debug(result)

使用此配置,订户将收到消息pub

With this configuration the subscriber receive the message published, but it get an exception while parsing it.

TypeError: on_something() got an unexpected keyword argument 'message'


推荐答案

最近,我尝试同时使用aiohttp和autobahn。我重新整理了纵横制文档中的示例(最初使用了Twisted),并获得了以下代码:

Recently I tried to use aiohttp and autobahn simultaneously. I reworked the example from crossbar documentation (originally using twisted) and got the following code:

import asyncio
import logging

from aiohttp import web
from aiohttp.web_exceptions import HTTPOk, HTTPInternalServerError
from autobahn.asyncio.component import Component

# Setup logging
logger = logging.getLogger(__name__)


class WebApplication(object):
    """
    A simple Web application that publishes an event every time the
    url "/" is visited.
    """

    count = 0

    def __init__(self, app, wamp_comp):
        self._app = app
        self._wamp = wamp_comp
        self._session = None  # "None" while we're disconnected from WAMP router

        # associate ourselves with WAMP session lifecycle
        self._wamp.on('join', self._initialize)
        self._wamp.on('leave', self._uninitialize)

        self._app.router.add_get('/', self._render_slash)

    def _initialize(self, session, details):
        logger.info("Connected to WAMP router (session: %s, details: %s)", session, details)
        self._session = session

    def _uninitialize(self, session, reason):
        logger.warning("Lost WAMP connection (session: %s, reason: %s)", session, reason)
        self._session = None

    async def _render_slash(self, request):
        if self._session is None:
            return HTTPInternalServerError(reason="No WAMP session")
        self.count += 1
        self._session.publish(u"com.myapp.request_served", self.count, count=self.count)
        return HTTPOk(text="Published to 'com.myapp.request_served'")


def main():
    REALM = "crossbardemo"
    BROKER_URI = "ws://wamp_broker:9091/ws"
    BIND_ADDR = "0.0.0.0"
    BIND_PORT = 8080

    logging.basicConfig(
        level='DEBUG',
        format='[%(asctime)s %(levelname)s %(name)s:%(lineno)d]: %(message)s')

    logger.info("Starting aiohttp backend at %s:%s...", BIND_ADDR, BIND_PORT)
    loop = asyncio.get_event_loop()

    component = Component(
        transports=BROKER_URI,
        realm=REALM,
    )
    component.start(loop=loop)

    # When not using run() we also must start logging ourselves.
    import txaio
    txaio.start_logging(level='info')

    app = web.Application(
        loop=loop)

    _ = WebApplication(app, component)

    web.run_app(app, host=BIND_ADDR, port=BIND_PORT)


if __name__ == '__main__':
    main()

这篇关于将Autobahn | Python与aiohttp集成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆