将Autobahn | Python与aiohttp集成 [英] Integrate Autobahn|Python with aiohttp
问题描述
我正在尝试将 aiohttp
Web服务器集成到Crossbar + Autobahn系统体系结构中。
I'm trying to integrate an aiohttp
web server into a Crossbar+Autobahn system architecture.
更多详细来说,当 aiohttp
服务器收到某个API调用时,它必须将消息发布到Crossbar路由器。
我见过此示例关于官方仓库,但我不知道如何将其集成到我的应用程序中。
More in detail, when the aiohttp
server receive a certain API call, it has to publish a message to a Crossbar router.
I've seen this example on the official repos but i've no clue on how to integrate it on my application.
理想情况下,我希望能够为此
Ideally, i would like to be able to do this
# class SampleTaskController(object):
async def handle_get_request(self, request: web.Request) -> web.Response:
self.publisher.publish('com.myapp.topic1', 'Hello World!')
return web.HTTPOk()
其中 self
是 SampleTaskController(object)$ c的实例$ c>定义了Web服务器的所有路由处理程序。
where self
il an instance of SampleTaskController(object)
which defines all the routes handler of the web server.
def main(argv):
cfg_path = "./task_cfg.json"
if len(argv) > 1:
cfg_path = argv[0]
logging.basicConfig(level=logging.DEBUG,
format=LOG_FORMAT)
loop = zmq.asyncio.ZMQEventLoop()
asyncio.set_event_loop(loop)
app = web.Application(loop=loop)
with open(cfg_path, 'r') as f:
task_cfg = json.load(f)
task_cfg['__cfg_path'] = cfg_path
controller = SampleTaskController(task_cfg)
controller.restore()
app['controller'] = controller
controller.setup_routes(app)
app.on_startup.append(controller.on_startup)
app.on_cleanup.append(controller.on_cleanup)
web.run_app(app,
host=task_cfg['webserver_address'],
port=task_cfg['webserver_port'])
请注意,我正在使用 zmq.asyncio.ZMQEventLoop
,因为服务器也在监听 zmq
套接字,该套接字在 controller.on_startup
方法内部配置。
Notice that i'm using an zmq.asyncio.ZMQEventLoop
because the server is also listening on a zmq
socket, which is configured inside the controller.on_startup
method.
高速公路,我还尝试使用 wampy
将消息发布到Crossbar,它可以工作,但是高速公路用户无法正确解析消息。
Instead of using autobahn, i've also tried to publish the message to Crossbar using wampy
and it works, but the autobahn subscribers couldn't correctly parse the message.
# autobahn subscriber
class ClientSession(ApplicationSession):
async def onJoin(self, details):
self.log.info("Client session joined {details}", details=details)
self.log.info("Connected: {details}", details=details)
self._ident = details.authid
self._type = u'Python'
self.log.info("Component ID is {ident}", ident=self._ident)
self.log.info("Component type is {type}", type=self._type)
# SUBSCRIBE
def gen_on_something(thing):
def on_something(counter, id, type):
print('----------------------------')
self.log.info("'on_{something}' event, counter value: {message}",something=thing, message=counter)
self.log.info("from component {id} ({type})", id=id, type=type)
return on_something
await self.subscribe(gen_on_something('landscape'), 'landscape')
await self.subscribe(gen_on_something('nature'), 'nature')
-
# wampy publisher
async def publish():
router = Crossbar(config_path='./crossbar.json')
logging.getLogger().debug(router.realm)
logging.getLogger().debug(router.url)
logging.getLogger().debug(router.port)
client = Client(router=router)
client.start()
result = client.publish(topic="nature", message=0)
logging.getLogger().debug(result)
使用此配置,订户将收到消息pub
With this configuration the subscriber receive the message published, but it get an exception while parsing it.
TypeError: on_something() got an unexpected keyword argument 'message'
推荐答案
最近,我尝试同时使用aiohttp和autobahn。我重新整理了纵横制文档中的示例(最初使用了Twisted),并获得了以下代码:
Recently I tried to use aiohttp and autobahn simultaneously. I reworked the example from crossbar documentation (originally using twisted) and got the following code:
import asyncio
import logging
from aiohttp import web
from aiohttp.web_exceptions import HTTPOk, HTTPInternalServerError
from autobahn.asyncio.component import Component
# Setup logging
logger = logging.getLogger(__name__)
class WebApplication(object):
"""
A simple Web application that publishes an event every time the
url "/" is visited.
"""
count = 0
def __init__(self, app, wamp_comp):
self._app = app
self._wamp = wamp_comp
self._session = None # "None" while we're disconnected from WAMP router
# associate ourselves with WAMP session lifecycle
self._wamp.on('join', self._initialize)
self._wamp.on('leave', self._uninitialize)
self._app.router.add_get('/', self._render_slash)
def _initialize(self, session, details):
logger.info("Connected to WAMP router (session: %s, details: %s)", session, details)
self._session = session
def _uninitialize(self, session, reason):
logger.warning("Lost WAMP connection (session: %s, reason: %s)", session, reason)
self._session = None
async def _render_slash(self, request):
if self._session is None:
return HTTPInternalServerError(reason="No WAMP session")
self.count += 1
self._session.publish(u"com.myapp.request_served", self.count, count=self.count)
return HTTPOk(text="Published to 'com.myapp.request_served'")
def main():
REALM = "crossbardemo"
BROKER_URI = "ws://wamp_broker:9091/ws"
BIND_ADDR = "0.0.0.0"
BIND_PORT = 8080
logging.basicConfig(
level='DEBUG',
format='[%(asctime)s %(levelname)s %(name)s:%(lineno)d]: %(message)s')
logger.info("Starting aiohttp backend at %s:%s...", BIND_ADDR, BIND_PORT)
loop = asyncio.get_event_loop()
component = Component(
transports=BROKER_URI,
realm=REALM,
)
component.start(loop=loop)
# When not using run() we also must start logging ourselves.
import txaio
txaio.start_logging(level='info')
app = web.Application(
loop=loop)
_ = WebApplication(app, component)
web.run_app(app, host=BIND_ADDR, port=BIND_PORT)
if __name__ == '__main__':
main()
这篇关于将Autobahn | Python与aiohttp集成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!