如何创建一个带有两个线程的 python 应用程序,每个线程都有一个高速公路应用程序 [英] How create a python application with two thread each which has a autobahn application

查看:30
本文介绍了如何创建一个带有两个线程的 python 应用程序,每个线程都有一个高速公路应用程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我还没有找到任何解决我的问题的方法.我需要创建一个带有两个线程的 python 应用程序,每个线程都使用高速公路库连接到 WAMP 路由器.

I have not found any solution for my problem. I need to create a python application with two thread, each of which is connected to a WAMP Router using autobahn library.

按照我写的实验代码:

wampAddress = 'ws://172.17.3.139:8181/ws'
wampRealm = 's4t'

from threading import Thread
from autobahn.twisted.wamp import ApplicationRunner
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet.defer import inlineCallbacks


class AutobahnMRS(ApplicationSession):
    @inlineCallbacks
    def onJoin(self, details):
        print("Sessio attached [Connect to WAMP Router]")

        def onMessage(*args):
            print args
        try:
            yield self.subscribe(onMessage, 'test')
            print ("Subscribed to topic: test")

        except Exception as e:
            print("Exception:" +e)

class AutobahnIM(ApplicationSession):
    @inlineCallbacks
    def onJoin(self, details):
        print("Sessio attached [Connect to WAMP Router]")

        try:
            yield self.publish('test','YOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO')
            print ("Subscribed to topic: test")

        except Exception as e:
            print("Exception:" +e)

class ManageRemoteSystem:
    def __init__(self):
        self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm)

    def start(self):
        self.runner.run(AutobahnMRS);


class InternalMessages:
    def __init__(self):
        self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm)

    def start(self):
        self.runner.run(AutobahnIM);

#class S4tServer:

if __name__ == '__main__':
    server = ManageRemoteSystem()
    sendMessage = InternalMessages()

    thread1 = Thread(target = server.start())
    thread1.start()
    thread1.join()

    thread2 = Thread(target = sendMessage.start())
    thread2.start()
    thread2.join()

当我启动这个 python 应用程序时,只启动了 thread1,稍后当我终止应用程序 (ctrl-c) 时,会显示以下错误消息:

When I launch this python application only the thread1 is started and later when I kill the application (ctrl-c) the following error messages are shown:

Sessio attached [Connect to WAMP Router]
Subscribed to topic: test
^CTraceback (most recent call last):
  File "test_pub.py", line 71, in <module>
    p2 = multiprocessing.Process(target = server.start())
  File "test_pub.py", line 50, in start
    self.runner.run(AutobahnMRS);
  File "/usr/local/lib/python2.7/dist-packages/autobahn/twisted/wamp.py", line 175, in run
    reactor.run()
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1191, in run
    self.startRunning(installSignalHandlers=installSignalHandlers)
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1171, in startRunning
    ReactorBase.startRunning(self)
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 683, in startRunning
    raise error.ReactorNotRestartable()
twisted.internet.error.ReactorNotRestartable

我需要在一个具有其功能的应用程序中实现,并且它必须具有一个系统,用于通过高速公路 python 库与 WAMP 路由器进行通信.

I need to implement in one application that has its functionalities and also it must has a system for communicate to a WAMP router with autobahn python library.

换句话说,我需要一个能够与 WAMP 路由器通信的解决方案,但同时这个应用程序不必被高速公路部分阻塞(我认为解决方案是启动两个线程,一个线程管理一些功能,第二个线程管理高速公路部分).

In other words, I need a solution able to communicate with a WAMP router, but in the same time this application doesn't must be blocked with the autobahn part (I think that the solution is to start two thread, one thread manages some functions and second thread manages the autobahn part).

使用我之前提出的模式,还有一个问题,需要在 WAMP 路由器上的特定主题中从无高速公路线程"中的应用程序部分发送消息,应通过调用此功能一个特定的功能而不阻塞其他功能.

With the schema that I proposed before, there is another problem, the need to send message, in a specific topic on the WAMP router, from the application part in the 'no autobahn thread', this functionality should be called through with a specific function without blocking the other functionalities.

我希望我已经提供了所有细节.

I hope I have given all the details.

非常感谢您的回复

--------------------------------编辑---------------------------------

--------------------------------EDIT---------------------------------

经过一番研究,我已经实现了我需要的 websocket 协议,代码如下:

After some research I have implemented what I need for websocket protocol, the code is the follow:

# ----- twisted ----------
class _WebSocketClientProtocol(WebSocketClientProtocol):
    def __init__(self, factory):
        self.factory = factory

    def onOpen(self):
        #log.debug("Client connected")
        self.factory.protocol_instance = self
        self.factory.base_client._connected_event.set()

class _WebSocketClientFactory(WebSocketClientFactory):
    def __init__(self, *args, **kwargs):
        WebSocketClientFactory.__init__(self, *args, **kwargs)
        self.protocol_instance = None
        self.base_client = None

    def buildProtocol(self, addr):
        return _WebSocketClientProtocol(self)
# ------ end twisted -------
lass BaseWBClient(object):

    def __init__(self, websocket_settings):
        #self.settings = websocket_settings
        # instance to be set by the own factory
        self.factory = None
        # this event will be triggered on onOpen()
        self._connected_event = threading.Event()
        # queue to hold not yet dispatched messages
        self._send_queue = Queue.Queue()
        self._reactor_thread = None

    def connect(self):

        log.msg("Connecting to host:port")
        self.factory = _WebSocketClientFactory(
                                "ws://host:port",
                                debug=True)
        self.factory.base_client = self

        c = connectWS(self.factory)

        self._reactor_thread = threading.Thread(target=reactor.run,
                                               args=(False,))
        self._reactor_thread.daemon = True
        self._reactor_thread.start()

    def send_message(self, body):
        if not self._check_connection():
            return
        log.msg("Queing send")
        self._send_queue.put(body)
        reactor.callFromThread(self._dispatch)

    def _check_connection(self):
        if not self._connected_event.wait(timeout=10):
            log.err("Unable to connect to server")
            self.close()
            return False
        return True

    def _dispatch(self):
        log.msg("Dispatching")
        while True:
            try:
                body = self._send_queue.get(block=False)
            except Queue.Empty:
                break
            self.factory.protocol_instance.sendMessage(body)

    def close(self):
        reactor.callFromThread(reactor.stop)

import time
def Ppippo(coda):
        while True:
            coda.send_message('YOOOOOOOO')
            time.sleep(5)

if __name__ == '__main__':

    ws_setting = {'host':'', 'port':}

    client = BaseWBClient(ws_setting)

    t1 = threading.Thread(client.connect())
    t11 = threading.Thread(Ppippo(client))
    t11.start()
    t1.start()

之前的代码工作正常,但我需要将其转换为在 WAMP 协议安装的 websocket 上运行.

The previous code work fine, but I need to translate this to operate on WAMP protocol insted websocket.

有人知道我是怎么解决的吗?

Does anyone know how I solve ?

推荐答案

坏消息是 Autobahn 正在使用 Twisted 主循环,所以你不能同时在两个线程中运行它.

The bad news is that Autobahn is using the Twisted main loop, so you can't run it in two threads at once.

好消息是,您不需要在两个线程中运行它来运行两件事情,无论如何,两个线程会更复杂.

The good news is that you don't need to run it in two threads to run two things, and two threads would be more complicated anyway.

启动多个应用程序的 API 有点混乱,因为您有两个 ApplicationRunner 对象,乍一看,您在高速公路上运行应用程序的方式是调用 ApplicationRunner.run.

The API to get started with multiple applications is a bit confusing, because you have two ApplicationRunner objects, and it looks at first glance that the way you run an application in autobahn is to call ApplicationRunner.run.

然而,ApplicationRunner 只是一种便利,它包装了设置应用程序的东西和运行主循环的东西;真正的工作发生在 WampWebSocketClientFactory 中.

However, ApplicationRunner is simply a convenience that wraps up the stuff that sets up the application and the stuff that runs the main loop; the real meat of the work happens in WampWebSocketClientFactory.

为了实现你想要的,你只需要摆脱线程,自己运行主循环,让 ApplicationRunner 实例简单地设置它们的应用程序.

In order to achieve what you want, you just need to get rid of the threads, and run the main loop yourself, making the ApplicationRunner instances simply set up their applications.

为了实现这一点,您需要更改程序的最后一部分来执行此操作:

In order to achieve this, you'll need to change the last part of your program to do this:

class ManageRemoteSystem:
    def __init__(self):
        self.runner = ApplicationRunner(url=wampAddress, realm=wampRealm)

    def start(self):
        # Pass start_reactor=False to all runner.run() calls
        self.runner.run(AutobahnMRS, start_reactor=False)


class InternalMessages:
    def __init__(self):
        self.runner = ApplicationRunner(url=wampAddress, realm=wampRealm)

    def start(self):
        # Same as above
        self.runner.run(AutobahnIM, start_reactor=False)


if __name__ == '__main__':
    server = ManageRemoteSystem()
    sendMessage = InternalMessages()
    server.start()
    sendMessage.start()

    from twisted.internet import reactor
    reactor.run()

这篇关于如何创建一个带有两个线程的 python 应用程序,每个线程都有一个高速公路应用程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆