Python,从另一个类调用类函数 [英] Python, Call a class function from another class

查看:324
本文介绍了Python,从另一个类调用类函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

您可以根据附件代码类处理中的 BroadcastServerFactory 类调用广播功能 p>

我尝试过这么多方法调用另一个类的函数,但没有解决方案。

  import time,sys 
从apscheduler.scheduler导入调度程序
导入线程
导入套接字
从twisted.internet导入反应器
从twisted.python导入日志
从twisted.web.server导入网站
从twisted.web.static import文件

从autobahn.websocket import WebSocketServerFactory,\
WebSocketServerProtocol,\
listenWS


类进程(threading.Thread):
def __init __(self,buffer3):
threading.Thread .__ init __(self)
self.setDaemon(True)
self.buffer3 = buffer3


def run(self):
factory.broadcast(I do not know what我在做!)



class BroadcastServerProtocol(WebSocketServerProtocol):

def onOpen(self):
self.factory 。
self.factory.broadcast('%s'from%s。如果不是二进制文件,则为:




def onMessage(self,msg,binary) %bgb


def connectionLost(self,reason):
WebSocketServerProtocol.connectionLost(self,reason)
self.factory.unregister b

$ b类BroadcastServerFactory(WebSocketServerFactory):

简单广播服务器将其接收的任何消息广播到当前连接的所有客户端。


def __init __(self,url,debug = False,debugCodePaths = False):
WebSocketServerFactory .__ init __(self,url,debug = debug,debugCodePaths = debugCodePaths )
self.clients = []
self.tickcount = 0
self.tick()

def tick(self):
self.tickcount + = 1
self.broadcast(从服务器%self.tickcount打勾%d')
reactor.callLater(1,self.tick)

def register self,client):
如果不是self.clients中的客户端:
printregistered client+ client.peerstr
self.clients.append(client)

def unregister(self,client):
如果客户端在self.clients:
printunregistered client+ client.peerstr
self.clients.remove(client)

def broadcast(self,msg):
printbroadcasts'%s'..%msg
for self.clients:
c.sendMessage(msg)
print消息发送到+ c.peerstr


类BroadcastPreparedServerFactory(BroadcastServerFactory):

功能与上述相同,
prepareMessage和sendPreparedMessage。


def broadcast(self,msg):
print广播准备消息'%s'..%msg
preparedMsg = self.prepareMessage (msg)
for self.clients:
c.sendPreparedMessage(preparedMsg)
print准备消息发送到+ c.peerstr

$ b b def testing():
buffer2 - hello
myDisplay = process(buffer2)
myDisplay.start()


if __name__ == '__main__':

如果len(sys.argv)> 1和sys.argv [1] =='debug':
log.startLogging(sys.stdout)
debug = True
else:
debug = False
level_scheduler = Scheduler()
level_scheduler.add_interval_job(testing,seconds = 5)
level_scheduler.start b $ b #ServerFactory = BroadcastServerFactory
ServerFactory = BroadcastPreparedServerFactory

factory = ServerFactory(ws:// localhost:9000,
debug = debug,
debugCodePaths =调试)

factory.protocol = BroadcastServerProtocol
factory.setProtocolOptions(allowHixie76 = True)
listenWS(factory)

webdir = File(。 )
web = Site(webdir)
reactor.listenTCP(8080,web)


reactor.run()



感谢

解决方案

BroadcastServerFactory 被调用到调用它的类实例进程创建时

 类进程(threading.Thread):
def __init __(self,buffer3m,broadcast_server_factory):
threading.Thread .__ init __(self)
self.setDaemon(True)
self.buffer3 = buffer3

self.factory = broadcast_server_factory

def run(self):
self。 factory.broadcast(我不知道我在做什么!)

它(它在运行语句中分配为 self.factory 我不能看到你在 __ main __ 中创建过程类,但它将创建一个类似



  p = process(buffer,factory)

Aside:在python中使用大写字母来表示类名称。 process - >过程


Can you anyone please help me (noob) call the broadcast function from class BroadcastServerFactory in class process, as per attached code

I have tried so many methods of call a function from another class, but no solution

import time, sys
from apscheduler.scheduler import Scheduler
import threading
import socket
from twisted.internet import reactor
from twisted.python import log
from twisted.web.server import Site
from twisted.web.static import File

from autobahn.websocket import WebSocketServerFactory, \
                               WebSocketServerProtocol, \
                               listenWS


class process(threading.Thread):
    def __init__(self, buffer3):
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.buffer3 = buffer3


    def run(self):
        factory.broadcast("I don't know what I'm doing!")



class BroadcastServerProtocol(WebSocketServerProtocol):

   def onOpen(self):
      self.factory.register(self)

   def onMessage(self, msg, binary):
      if not binary:
         self.factory.broadcast("'%s' from %s" % (msg, self.peerstr))

   def connectionLost(self, reason):
      WebSocketServerProtocol.connectionLost(self, reason)
      self.factory.unregister(self)


class BroadcastServerFactory(WebSocketServerFactory):
   """
   Simple broadcast server broadcasting any message it receives to all
   currently connected clients.
   """

   def __init__(self, url, debug = False, debugCodePaths = False):
      WebSocketServerFactory.__init__(self, url, debug = debug, debugCodePaths = debugCodePaths)
      self.clients = []
      self.tickcount = 0
      self.tick()

   def tick(self):
      self.tickcount += 1
      self.broadcast("'tick %d' from server" % self.tickcount)
      reactor.callLater(1, self.tick)

   def register(self, client):
      if not client in self.clients:
         print "registered client " + client.peerstr
         self.clients.append(client)

   def unregister(self, client):
      if client in self.clients:
         print "unregistered client " + client.peerstr
         self.clients.remove(client)

   def broadcast(self, msg):
      print "broadcasting message '%s' .." % msg
      for c in self.clients:
         c.sendMessage(msg)
         print "message sent to " + c.peerstr


class BroadcastPreparedServerFactory(BroadcastServerFactory):
   """
   Functionally same as above, but optimized broadcast using
   prepareMessage and sendPreparedMessage.
   """

   def broadcast(self, msg):
      print "broadcasting prepared message '%s' .." % msg
      preparedMsg = self.prepareMessage(msg)
      for c in self.clients:
         c.sendPreparedMessage(preparedMsg)
         print "prepared message sent to " + c.peerstr


def testing():
    buffer2 - "hello"
    myDisplay = process(buffer2)
    myDisplay.start()


if __name__ == '__main__':

   if len(sys.argv) > 1 and sys.argv[1] == 'debug':
      log.startLogging(sys.stdout)
      debug = True
   else:
      debug = False
   level_scheduler = Scheduler()
   level_scheduler.add_interval_job(testing, seconds=5)
   level_scheduler.start()
   #ServerFactory = BroadcastServerFactory
   ServerFactory = BroadcastPreparedServerFactory

   factory = ServerFactory("ws://localhost:9000",
                           debug = debug,
                           debugCodePaths = debug)

   factory.protocol = BroadcastServerProtocol
   factory.setProtocolOptions(allowHixie76 = True)
   listenWS(factory)

   webdir = File(".")
   web = Site(webdir)
   reactor.listenTCP(8080, web)


   reactor.run()

Thanks

解决方案

Pass the class instance of BroadcastServerFactory to be called to the class instance that calls it process on creation

class process(threading.Thread):
    def __init__(self, buffer3m, broadcast_server_factory):
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.buffer3 = buffer3

        self.factory = broadcast_server_factory

    def run(self):
        self.factory.broadcast("I don't know what I'm doing!")

and then call it (it's assigned as self.factory in the run statement. I can't see where you create a process class in your __main__ but it will be created with something like

 p = process(buffer, factory)

Aside: Using capital letters for class names is considered good form in python process -> Process

这篇关于Python,从另一个类调用类函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆