python多线程数据竞赛 [英] python multithreading data race

查看:71
本文介绍了python多线程数据竞赛的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在python 2.7上制作多线程系统. 基本上,它具有3个线程和1个具有共享数据的单例类.

I`m making the multithread system on python 2.7. Basically, it has 3 thread and one singleton-class with shared data.

红色箭头-调用;蓝色箭头-访问权限

Red arrow - invoke; Blue arrow - access

每个线程在文件中都是单独的类.文件main.py导入工作和通信文件以及共享数据.然后,主线程在一个线程中调用工作类,并在另一个线程中进行通信.因此,共享数据(仅作为单例的一个实例)在工作类和通信类的构造函数中传递.

Every thread is separate class in a file. The file main.py import working and communication files, and shared data. Then main thread invoke working class in one thread and communication in another one thread. Herewith shared data, as only one instance of singleton, is passed in constructors of working class and communication class.

文件main.py

File main.py

import communication
import Worker
import Data

app_data = Data.Instance()
#...........

SRV = communication.Server(app_data)
SRV.setDaemon(True)
SRV.start()

#...........

while True
    #...........
    # MUST BE locker.acquire()
    if condition1:
        if condition2:
            job = Worker(app_data, SRV.taskResultSendToSlaves, app_data.ip_table[app_data.cfg.MY_IP]['tasks'].pop())
            job.setDaemon(True)
            job.start()
    # MUST BE locker.release()

文件通讯.py

File communication.py

class Server(threading.Thread):

    # .................

    def __init__(self, data):
        self.data = data
        # .................
        threading.Thread.__init__(self)

    def run(self):
        srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        srv.settimeout(self.data.cfg.TIMEOUT)
        srv.bind((self.my_addr, self.my_port))
        srv.listen(self.data.cfg.NUMBER_OF_CLIENTS)
        print "Start server"
        while True:
            # HANDLING MESSAGES FROM OTHER PC

    # .................

File Worker.py

File Worker.py

class Worker(threading.Thread):    
    def __init__(self, data, sender, taskname):
        self.data = data
        self.sender = sender
        self.taskname = taskname
        threading.Thread.__init__(self)

    def run(self):
        import thread
        self.data.complete_task.clear()
        tick_before = time.time()
        startupinfo = subprocess.STARTUPINFO()
        startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
        startupinfo.wShowWindow = subprocess.SW_HIDE
        p = subprocess.Popen(self.data.cfg.PATH_INTERPRETER + " " + self.data.cfg.PATH_TASKS + self.taskname, startupinfo=startupinfo, shell=False, stdout=subprocess.PIPE)
        job_result, err = p.communicate()
        tick_after = time.time()
        work_time = tick_after - tick_before  
        # MUST BE locker.acquire()      
        self.data.task_table[self.taskname]['status'] = 'complete'
        self.data.task_table[self.taskname]['result'] = job_result
        self.data.task_table[self.taskname]['time'] = work_time
        # MUST BE locker.release()
        logging.debug("%s task is done" % self.taskname)
        tr = threading.Thread(target=self.sender, name="SENDER", args=(self.taskname, ))
        tr.setDaemon(True)
        tr.start()
        tr.join()
        logging.debug("%s task is sent" % self.taskname)
        self.data.complete_task.set()
        thread.exit()

Singletone.py

Singletone.py

class Singleton:

    def __init__(self, decorated):
        self._decorated = decorated

    def Instance(self):
        try:
            return self._instance
        except AttributeError:
            self._instance = self._decorated()
            return self._instance

def __call__(self):
    raise TypeError('Singletons must be accessed through `Instance()`.')

def __instancecheck__(self, inst):
    return isinstance(inst, self._decorated)

data.py

data.py

#-*- coding: utf-8 -*-
from singletone import Singleton
from configs import Configurations
import threading
import logging


@Singleton
class Data:

    def __init__(self):
        logging.basicConfig(format=u'%(filename)-10s[LINE:%(lineno)d] <%(funcName)-15s> # %(levelname)-8s [%(asctime)s]  %(message)s'.encode('cp1251', 'ignore'), level=logging.DEBUG, filename='mylog.log')
        logging.log(100, '='*120)
        self.cfg = Configurations()
        self.ip_table = self.getIPTable()
        self.task_table = self.getTaskTable()
        self.locker = threading.Lock()
        self.initialization = threading.Event()
        self.initialization.clear()
        self.identification = threading.Event()
        self.identification.clear()
        self.complete_task = threading.Event()
        self.complete_task.set()
        self.flag_of_close = False

    def __str__(self):
        return "\
        {0}\n\
        \n\
        {1}\n\
        \n\
        {2}\n\
        ".format(str(self.cfg), self.strIPTable(), self.strTaskTable())

    def strIPTable(self):
        #return str(self.ip_table)
        result = ["%s = %s" % (key, str(value)) for key, value in self.ip_table.items()]
        result.sort()
        return "\n\t\t".join(result)

    def strTaskTable(self):
        #return str(self.task_table)
        result = ["%s = %s" % (key, str(value)) for key, value in self.task_table.items()]
        result.sort()
        return "\n\t\t".join(result)

    def getIPTable(self):
        result = {}
        if self.cfg.IPS:
            result = dict((item.strip(), {'status': True, 'port': 8000, 'tasks': []}) for item in self.cfg.IPS.split(','))
            # result = dict((item.strip(), {'status': False, 'port': 8000, 'tasks': []}) for item in self.cfg.IPS.split(','))
        result[self.cfg.MY_IP] = {'status': True, 'port': 8000, 'tasks': []}
        return result

    def getTaskTable(self):
        result = {}
        if self.cfg.TASKS:
            result = dict((item.strip(), {'status': 'uncomplete', 'result': '', 'time': 0}) for item in self.cfg.TASKS.split(','))
        return result

    def getTotalCompleteTasks(self):
        result = 0
        for taskname in self.task_table.keys():
            if self.task_table[taskname]['status'] == 'complete':
                result += 1
        return result


if __name__ == '__main__':
    data = Data.Instance()
    print data

单个我从stackoverflow中被盗

启动该系统后,有时会发生数据争用.在工作时和主线程同时读取共享数据.我认为我们需要一个线程.锁定在这里.然后我犯了一个错误,我将Lock对象放入共享数据中,并用它来分隔访问.不久,我就明白了我的错误.

After start this system, sometimes a have a data race. When working and main thread at the same time read shared data. I think we need a threading.Lock here. Then i did a mistake, i put Lock object in shared data and have used it to separate access. Shortly a have understand my mistake.

文件名已更改,一些代码已删除.

Filenames has changed, some pieces of code has removed.

但是现在我不知道必须在哪里放置Lock对象,每个线程都可以轻松且以正确的方式访问和使用它.你能给我建议吗?

But for now i don't know where i must put Lock object, that every thread can easily and in a right way access and use it. Can you give me advice?

我的英语不是很好,所以要宽容.我希望你能理解我的问题...

My english is not very good, so be tolerant. I hope you understand my question...

PS

除此之外,我尝试在类的构造函数中传递Lock()对象.和我有同样的麻烦.该应用程序已降落到可以访问数据的位置.而且我找不到确切的位置.每次启动都可能以50%的概率失败.

Besides that, i tried to pass Lock() object in constructors of classes. And i have same trouble. The application has fallen somewhere, where data was accessed. And i can't find out where exactly it is. Every start can fall down application with 50% probability.

推荐答案

我发现了该错误. 这是单例课程,但我不知道如何解决.

I found the bug. It was Singleton class, but i don't know how to fix it.

这篇关于python多线程数据竞赛的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆