如何在wxPython GUI应用程序中实现线程 [英] How to implement a thread in a wxPython GUI application

查看:83
本文介绍了如何在wxPython GUI应用程序中实现线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我无法正确实现线程,以防止我的应用程序锁定并出现怪异行为.该应用程序旨在登录到基于ubuntu的服务器或ubuntu嵌入式服务器,并搜索可能没有问题的日志文件.嵌入式服务器可以工作,但是在进行搜索时,该应用程序仍会锁定.站点服务器将不会处理.我尚未编写本地文件搜索代码.一旦弄清楚如何实现线程,我想添加一个进度条.自从我学习和使用Python几个月以来,我认为这将是直截了当的,但是使用GUI却存在挑战.我仍然是新手,对所有批评持开放态度.它只会帮助我成为一个更好的程序员.任何帮助是极大的赞赏.这是下面的代码:

I'm having trouble implementing a thread correctly to keep my application from locking up and experiencing weird behavior. The app is designed to log into a ubuntu based server or ubuntu embedded server and search for log files that may be in the clear. The embedded server works, but the app keeps locking up while the search is occurring. The siteserver will not process. I have yet to code the local file search. I would like to add a progress bar once I figure out how to implement threads. I thought this would be straight forward since I've been learning and working with Python for several months now, but working with a GUI has its challenges. I'm still a neophyte and open to all the criticisms; it only helps me to become a better programmer. Any help is greatly appreciated. Here is the code below:

#!c:\python27

import wx
import os
import re
import paramiko
import string
import fileinput
import os.path
import dircache
import sys
import time
import datetime, time
import wx

from wxGui import *



class MyApp(wx.App):
    def OnInit(self):
        frame = MyFrame("SecureTool v2.0.0", (50, 60), (458, 332))
        frame.Show()
        self.SetTopWindow(frame)
        return True



class MyFrame(wx.Frame):
    def __init__(self, title, pos, size):
        wx.Frame.__init__(self, None, -1, title, pos, size)

        toolbar = self.CreateToolBar()
        toolbar.Realize()
        menuFile = wx.Menu()
        menuFile.Append(1, "&About...")
        menuFile.AppendSeparator()
        menuFile.Append(2, "E&xit")
        menuBar = wx.MenuBar()
        menuBar.Append(menuFile, "&File")
        menu2 = wx.Menu()
        menu2.Append(wx.NewId(), "&Copy", "Copy in status bar")
        menu2.AppendSeparator()
        menu2.Append(wx.NewId(), "C&ut", "")
        menu2.AppendSeparator()
        menu2.Append(wx.NewId(), "Paste", "")
        menu2.AppendSeparator()
        menu2.Append(wx.NewId(), "&Options...", "Display Options")
        menuBar.Append(menu2, "&Edit")

        self.SetMenuBar(menuBar)
        self.CreateStatusBar()
        self.SetStatusText("Welcome to SecureTool!")
        self.Bind(wx.EVT_MENU, self.OnAbout, id=1)
        self.Bind(wx.EVT_MENU, self.OnQuit, id=2)
        panel = wx.Panel(self)
        panel.SetBackgroundColour('LIGHT GREY')
        #Close button
        button = wx.Button(panel, label="EXIT", pos=(229, 160), size=(229, 80))
        self.Bind(wx.EVT_BUTTON, self.OnQuit, button)
        self.Bind(wx.EVT_CLOSE, self.OnCloseWindow)
        #Embed Server button
        button2 = wx.Button(panel, label="Embed Server", pos=(0, 160), size=(229, 80))
        self.Bind(wx.EVT_BUTTON, self.OnIP, button2)
        #Site Server
        button3 = wx.Button(panel, label="SITESERVER", pos=(0, 80), size=(229, 80))
        self.Bind(wx.EVT_BUTTON, self.OnUsrPswd, button3)
        #Local Search
        button4 = wx.Button(panel, label="LOCAL SEARCH", pos=(229, 80), size=(229, 80))
        self.Bind(wx.EVT_BUTTON, self.OnOpen, button4)

        EVT_RESULT(self, self.OnResult)
        self.worker = None        

    def OnIP(self, event):
        ip_address = 0
        result = ''
        dlg = wx.TextEntryDialog(None, "Enter the IP Address.",
        'Embed Server Connect', 'xxx.xxx.xxx.xxx')
        if dlg.ShowModal() == wx.ID_OK:
            ip_address = dlg.GetValue()
        if ip_address:    
            cmsg = wx.MessageDialog(None, 'Do you want to connect to: ' + ip_address,
                                    'Connect', wx.YES_NO | wx.ICON_QUESTION)
            result = cmsg.ShowModal()

        if result == wx.ID_YES:
            self.DispConnect(ip_address)

            cmsg.Destroy()
        dlg.Destroy()
        return True

    def OnUsrPswd(self, event):
        passwrd = 0
        result = ''
        result = wx.TextEntryDialog(None, 'Enter Weekly Password', 'Site Server login','')
        if result.ShowModal() == wx.ID_OK:
            passwrd = result.GetValue()
        if passwrd:
            psmsg = wx.MessageDialog(None, 'Do you want to connect to the Siteserver?', 'Connect',
                                 wx.YES_NO | wx.ICON_QUESTION)
            result = psmsg.ShowModal()
        if result == wx.ID_YES:
            self.SiteserverConnect(passwrd)

            psmsg.Destroy()
        result.Destroy()
        return True

    def ErrMsg(self):
        ermsg = wx.MessageDialog(None, 'Invalid Entry!', 'ConnectionDialog', wx.ICON_ERROR)
        ermsg.ShowModal()
        ermsg.Destroy()

    def GoodConnect(self):
        gdcnt = wx.MessageDialog(None, 'You are connected!', 'ConnectionStatus', wx.ICON_INFORMATION)
        gdcnt.ShowModal()
        #if gdcnt.ShowModal() == wx.ID_OK:
        gdcnt.Destroy()

    def OnFinish(self):
        finish = wx.MessageDialog(None, 'Job is finished!', 'WorkStatus', wx.ICON_INFORMATION)
        finish.ShowModal()
        finish.Destroy()


    def DispConnect(self, address):
        pattern = r"\b(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b"
        port = 22
        user = 'root'
        password ='******'
        if re.match(pattern, address):
            ssh = paramiko.SSHClient()
            ssh.load_system_host_keys()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh.connect(address,port,user,password)
            Ssh = ssh
            self.GoodConnect()
            self.OnSearch(Ssh)
        else:
            self.ErrMsg()

    def SiteserverConnect(self, password):
        port = 22
        user = 'root2'
        address = '10.5.48.2'
        if password:
            ssh = paramiko.SSHClient()
            ssh.load_system_host_keys()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh.connect(address,port,user,password)
            Ssh = ssh
            self.GoodConnect()
            self.OnSiteSearch(Ssh)
        else:
            self.ErrMsg()

    def startWorker(self,a, b, c):
        self.button2.Disable()
        self.thread = Thread(target=self.LongRunningSearch)
        self.thread.start()

    def OnSearch(self, sssh):
        self.startWorker(self.OnFinish, self.LongRunningSearch, wargs=[sssh])
        self.OnFinish()

    def LongRunningSearch(sssh):
        ssh = sssh
        apath = '/'
        apattern = '"*.txt" -o -name "*.log"' 
        rawcommand = 'find {path} -name "*.txt" -o -name "*.log"' 
        command1 = rawcommand.format(path=apath, pattern=apattern)
        stdin, stdout, stderr = ssh.exec_command(command1)
        filelist = stdout.read().splitlines()
        ftp = ssh.open_sftp()
        for afile in filelist:
            (head, filename) = os.path.split(afile)

        paths = '/dispenser_result.log'
        temp = ftp.file(paths, 'w')
        from time import strftime
        temp.write("{0:^75}".format("Company -Security Report" ) + strftime("    %Y-%m-%d %H:%M:%S") + "\n\n")   
        ustring = wx.TextEntryDialog(None, 'Enter a search string below:', 'Search', 'String Name')
        if ustring.ShowModal() == wx.ID_OK:
            userstring = ustring.GetValue()

        if userstring:
        userStrHEX = userstring.encode('hex')
        userStrASCII = ''.join(str(ord(char)) for char in userstring)
        regex = re.compile(r"(%s|%s|%s)" % ( re.escape( userstring ), re.escape( userStrHEX ), re.escape( userStrASCII )))      
    else:
        sys.exit('You Must Enter A String!!!')

    count = 0
    for afile in filelist:
        (head, filename) = os.path.split(afile)
        if afile.endswith(".log") or afile.endswith(".txt"):
            f=ftp.open(afile, 'r')
            for i, line in enumerate(f.readlines()):
                result = regex.search(line)
                if result:
                    count += 1
                    ln = str(i)
                    pathname = os.path.join(afile)
                    template = "\n\nLine: {0}\nFile: {1}\nString Type: {2}\n\n"
                    output = template.format(ln, pathname, result.group())
                    ftp.get(afile, 'c:\\Extracted\\' + filename)
                    temp.write(output)
                    break
            else:
                #print "No Match in: " + os.path.join(afile)
                temp.write("\nNo Match in: " + os.path.join(afile))
            f.close()

    for fnum in filelist:
        #print "\nFiles Searched: ", len(filelist)
        #print "Files Matched: ", count
        num = len(filelist)

        temp.write("\n\nFiles Searched:" + '%s\n' % (num))
        temp.write("Files Matched:"+ '%s\n' % (count))
        temp.write("Search String:"+ '%s\n' % (userstring))
        break
        temp.close()
    defaultFolder = "DispenserLogResults"
    if not defaultFolder.endswith(':') and not os.path.exists('c:\\Extracted\\DispenserLogResults'):
        os.mkdir('c:\\Extracted\\DispenserLogResults')
    else:
         pass
    ftp.get(paths, 'c:\\Extracted\\DispenserLogResults\\dispenser_result.log')

    ftp.remove(paths)

    re.purge()
    ftp.close()
    ssh.close()

    def OnSiteSearch(self, sssh):
        ssh = sssh
        apath = '/var/log/apache2 /var/opt/smartmerch/log/'
        apattern = '"*.log"' 
        rawcommand = 'find {path} -type f -name "*.log"' 
        command1 = rawcommand.format(path=apath, pattern=apattern)
        stdin, stdout, stderr = ssh.exec_command(command1)
        filelist = stdout.read().splitlines()
        ftp = ssh.open_sftp()
        for afile in filelist:
            (head, filename) = os.path.split(afile)

        paths = '/var/tmp/siteserver_result.log'
        temp = ftp.file(paths, 'w')
        from time import strftime
        temp.write("{0:^75}".format("Gilbarco - SQA Security Report" ) + strftime("    %Y-%m-%d %H:%M:%S") + "\n\n")   
        temp.write("\n{0:^75}".format("SiteServer Logs" ))
        ustring = wx.TextEntryDialog(None, 'Enter a search string below:', 'Search', 'String Name')
        if ustring.ShowModal() == wx.ID_OK:
            userstring = ustring.GetValue()

        if userstring:
            userStrHEX = userstring.encode('hex')
            userStrASCII = ''.join(str(ord(char)) for char in userstring)
            regex = re.compile(r"(%s|%s|%s)" % ( re.escape( userstring ), re.escape( userStrHEX ), re.escape( userStrASCII )))      
        else:
            sys.exit('You Must Enter A String!!!')

        count = 0
        for afile in filelist:
            (head, filename) = os.path.split(afile)
            if afile.endswith(".log") or afile.endswith(".txt"):
                f=ftp.open(afile, 'r')
                for i, line in enumerate(f.readlines()):
                    result = regex.search(line)
                    if result:
                        count += 1
                        ln = str(i)
                        pathname = os.path.join(afile)
                        template = "\n\nLine: {0}\nFile: {1}\nString Type: {2}\n\n"
                        output = template.format(ln, pathname, result.group())
                        ftp.get(afile, 'c:\\Extracted\\' + filename)
                        temp.write(output)
                        break
                else:
                    temp.write("\nNo Match in: " + os.path.join(afile))
                f.close()

        for fnum in filelist:
            num = len(filelist)

            temp.write("\n\nFiles Searched:" + '%s\n' % (num))
            temp.write("Files Matched:"+ '%s\n' % (count))
            temp.write("Search String:"+ '%s\n' % (userstring))
            break
            temp.close()
        defaultFolder = "SiteServerLogResults"
        if not defaultFolder.endswith(':') and not os.path.exists('c:\\Extracted\\SiteServerLogResults'):
            os.mkdir('c:\\Extracted\\SiteServerLogResults')
        else:
            pass
        ftp.get(paths, 'c:\\Extracted\\SiteServerLogResults\\siteserver_result.log')

        ftp.remove(paths)

        re.purge()
        ftp.close()
        ssh.close()
        self.OnFinish()

    def OnOpen(self,e):
        self.dirname = ''
        dlg = wx.FileDialog(self, "Choose a file", self.dirname, "", "*.*", wx.OPEN)
        if dlg.ShowModal() == wx.ID_OK:
            self.filename = dlg.GetFilename()
            self.dirname = dlg.GetDirectory()
            f = open(os.path.join(self.dirname, self.filename), 'r')
            self.control.SetValue(f.read())
            f.close()
        dlg.Destroy()   

    def OnQuit(self, event):
        self.Close(True)

    def OnAbout(self, event):
        wx.MessageBox("This is sQAST v2.0.0",
            "About secureTool", wx.OK | wx.ICON_INFORMATION, self)



    def OnCloseWindow(self, event):
        self.Destroy()

if __name__ == '__main__':
    app = MyApp(False)
    app.MainLoop()

运行后出现跟踪错误:

Traceback (most recent call last):
File "C:\SQA_log\wxGui.py", line 87, in OnIP
self.DispConnect(ip_address)
File "C:\SQA_log\wxGui.py", line 143, in DispConnect
self.OnSearch(Ssh)
File "C:\SQA_log\wxGui.py", line 169, in OnSearch
self.startWorker(self.OnFinish, self.LongRunningSearch, wargs=[sssh])

推荐答案

在您的特定情况下,我会做类似的事情:

In your particular case I would do something like:

1)封装长任务并将其与GUI反应分开,简化您的方法:

1) Encapsulate the long task and separate it from the GUI reaction, simplify your method:

def OnSearch(self, sssh):
    self.LongRunningSearch(sssh) # Move all the blocking code here,
                                 # just NOT the GUI reaction !
                                 # Meaning self.OnFinish()...
    self.OnFinish()

2)确认它仍然可以正常运行.然后修改方法以添加线程:

2) Verify that it still runs fine. Then modify the method to add the thread:

def OnSearch(self, sssh):
    startWorker(self.OnFinish, self.LongRunningSearch, wargs=[sssh])

self.OnSearch将立即结束,并且在运行self.LongRunningSearch的线程完成后将调用self.OnFinish.由于我无法在计算机上运行您的代码,因此可能仍需要进行一些调整.

self.OnSearch will end immediately and self.OnFinish will be called after the thread running self.LongRunningSearch has finished. It may still need some tuning as I am unable to run your code on my computer.

这篇关于如何在wxPython GUI应用程序中实现线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆