此非线程脚本意外地比线程版本运行得快 [英] This Non-Threaded script unexpectedly runs faster than the Threaded version

查看:45
本文介绍了此非线程脚本意外地比线程版本运行得快的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个python脚本,用于验证从数据库中某些行获取的数据,然后将错误记录在同一数据库的另一个表中. 脚本会验证每行并将其标记为已验证&根据验证结果,错误= True/False.对每一行重复此过程.这样,我想我会通过创建线程来添加一些类固醇,以便通过独立的线程来完成对每一行的验证,从而减少了验证一批行所需的时间.

I have a python script which validates data fetched from some rows in a database and then logs the errors in a different table in the same database. The script validates each row and marks it as validated & has error = True/False depending on the validation outcome. This process is repeated for each row. With that, I thought I'd add some steroids by creating threads such that the validation for each row is done by independent threads thus reducing the time it takes to validate a batch of rows.

令我惊讶的是,我发现线程脚本比非线程脚本花费的时间更长.平均而言,要验证1502行数据,非线程脚本花费1.5秒,而线程脚本花费2.27秒.可能不算太大,但理想情况下,我将一次运行200万条记录,因此时间开销将非常可观.除此之外,我认为线程化应用程序的完成速度会更快! :-)

To my surprise, I find that the threaded script is taking slightly longer than the non-threaded one. On average to validate 1502 rows of data it takes the Non-Threaded script 1.5 seconds while the threaded script takes 2.27 seconds. That might not be much but ideally I'll be running through 2 million records at a go so that time overhead will be significant. That plus I would assume that threaded apps would finish faster! :-)

这两个脚本在创建线程之前的相同时间大约为0.01秒.至此,创建了SQLAlchemy会话,并获取了所有要验证的数据和关系,即外键等.从那里开始,非线程脚本完成得更快.下面是我的代码.

The two scripts clock the same time of about 0.01 seconds upto the point of creating threads. By this point the SQLAlchemy session is created and all the data to be validated and relations i.e foreign keys etc are fetched. From there though, the non-threaded script finishes faster. Below is my code.

1.0无线程脚本

#Alot of code goes above this to fetch the data  that is passed on to the validator function
#However, the two scripts are the same upto this point in regards to time taken so didn't see need to post them.
for lf_detail_id in load_file_detail_id:
    params = lf_detail_id, load_file_id, entry_number[lf_detail_counter], \
           data[lf_detail_counter], template_version[lf_counter], \
           load_file_detail, error, dt_file, dt_columns 
    data_list.append(params)
    lf_detail_counter += 1
    no_of_records += 1

validator = Validate()
validator.validator(no_of_records, data_list)
record_counter += lf_detail_counter
data_list = None
no_of_records = 0
print("Validated '%s': seconds %s" %(filename[lf_counter], time.time()-file_start_time))     #print time it took to run'

#Mark the load file as validated
is_done = load_file.set_validation(load_file_id, True)
if is_done == False:
    raise Exception ("Can't update load_file's is_validated parameter: ", lf_detail_id)

#Reset counters
lf_detail_counter = 0
lf_counter += 1

#Commit The Entire Transaction.
session.commit()
print("NoThread:Finished validating %s file(s) with %s record(s) in %s seconds\n" %(lf_counter, record_counter, time.time()- process_start_time))

1.1.非线程脚本的验证功能

class Validate():
    has_error = None
    def validator(self, loop_length, job):                
        '''Validate data'''
        for row_counter in range(loop_length):
            load_file_detail_id, load_file_id, entry_number, data, \
            template_version, load_file_detail, error, dt_file, dt_columns = job[row_counter]
            error_detail = ErrorLogDetail()
            if data.strip() == "":
                error_detail.errorlog = error
                error_detail.load_file_detail_id = load_file_detail_id
                error_detail.pos_row = entry_number
                error_detail.pos_col = None
                error_detail.value_provided = None
                error_detail.column_name = None
                error_detail.value_provided = None
                error_detail.description = "error message 1"
                session.add(error_detail)
                error_detail = ErrorLogDetail()
                self.has_error = True
                self.set_validation(load_file_detail, load_file_detail_id, True, False)
                continue
            elif len(data) != int(dt_file.data_length):
                error_detail.errorlog = error
                error_detail.load_file_detail_id = load_file_detail_id = load_file_detail_id
                error_detail.pos_row = entry_number   
                error_detail.pos_col = None
                error_detail.column_name = None
                error_detail.value_provided = None
                error_detail.description = "error message 2"
                session.add(error_detail)
                error_detail = ErrorLogDetail()
                self.has_error = True
                self.set_validation(load_file_detail, load_file_detail_id, True, False)  
                continue
            else:
                #Continue with extra validation

            #If record passes all validation then mark mark it as haserror = False
            if self.has_error == False:
                self.set_validation(load_file_detail, load_file_detail_id, False, True)
            else:
                self.has_error = False
            jobs.task_done()    #For the script with threading the job is marked as done. Else this does not appear in the non-threaded script  

2.0线程脚本

#Alot of code goes above this to fetch the data  that is passed on to the validator function
#However, the two scripts are the same upto this point in regards to time taken so didn't see need to post them.
for lf_detail_id in load_file_detail_id:
    params = lf_detail_id, load_file_id, entry_number[lf_detail_counter], \
           data[lf_detail_counter], template_version[lf_counter], \
           load_file_detail, error, dt_file, dt_columns 
    data_list.append(params)
    lf_detail_counter += 1
    queue_size += 1
    if queue_size == THREAD_LIMIT:
        myqueuing(queue_size, data_list)
        queue_size = 0

#spawn a pool of threads, and pass them queue instance 
if queue_size > 0:
    myqueuing(queue_size, data_list)

#Keep record of rows processed
record_counter += lf_detail_counter 
print("Validated '%s': seconds- %s " %(filename[lf_counter], time.time()-file_start_time))     #print time it took to run'

#Mark the load file as validated
is_done = load_file.set_validation(load_file_id, True)
if is_done == False:
    raise Exception ("Can't update load_file's is_validated parameter: ", lf_detail_id)

#Commit The Entire Transaction.
session.commit()
#Reset counters
lf_detail_counter = 0
lf_counter += 1
data_list = None
queue_size = 0              
print("HasThread:Finished loading %s file(s) with %s record(s) in %s seconds\n" %(lf_counter, record_counter, time.time()-process_start_time))     #print time it took to run'

2.1.线程验证功能

THREAD_LIMIT = 50                # This is how many threads we want
jobs = queue.Queue()           # This sets up the queue object to use 5 slots
singlelock = threading.Lock()   # This is a lock so threads don't print trough each other (and other reasons)

def myqueuing(queuesize, data):
    '''Put the fetched data in a queue and instantiate threads to
    process the queue'''
    # Spawn the threads
    is_valid_date("20131212", True) #Calling this here to avoid a bug in time.striptime() when threading
    for x in range(queuesize):
        # This is the thread class that we instantiate.
        workerbee().start()

    # Put stuff in queue
    for i in range(queuesize):
        # Block if queue is full, and wait 2 seconds. After 5s raise Queue Full error.
        try:
            jobs.put(data[i], block=True, timeout=2)
        except:
            singlelock.acquire()
            print ("The queue is full !")
            singlelock.lock.release()       

    # Wait for the threads to finish
    singlelock.acquire()        # Acquire the lock so we can print
    print ("Waiting for threads to finish.")
    singlelock.release()       # Release the lock
    jobs.join()                 # This command waits for all threads to finish.             


class workerbee(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)
        self.lock = threading.Lock()
        self.has_error = False

    def run(self):
        #try:
        job = jobs.get(True,1)
        load_file_detail_id, load_file_id, entry_number, data, \
        template_version, load_file_detail, error, dt_file, dt_columns = job                
        '''Validates the data.'''
        error_detail = ErrorLogDetail()
        #Again please note that this part is identical for both the non-threaded and the threaded script. 
        #After each pass on a record, the record is marked as validated and if has_error = True
        if data.strip() == "":
            error_detail.errorlog = error
            error_detail.load_file_detail_id = load_file_detail_id
            error_detail.pos_row = entry_number
            error_detail.pos_col = None
            error_detail.value_provided = None
            error_detail.column_name = None
            error_detail.value_provided = None
            error_detail.description = "erro message1"
            session.add(error_detail)
            error_detail = ErrorLogDetail()
            self.has_error = True
            self.set_validation(load_file_detail, load_file_detail_id, True, True)     
        elif len(data) != int(dt_file.data_length):
            error_detail.errorlog = error
            error_detail.load_file_detail_id = load_file_detail_id = load_file_detail_id
            error_detail.pos_row = entry_number   
            error_detail.pos_col = None
            error_detail.column_name = None
            error_detail.value_provided = None
            error_detail.description = "erro message2")
            session.add(error_detail)
            error_detail = ErrorLogDetail()
            self.has_error = True
            self.set_validation(load_file_detail, load_file_detail_id, True, True)    
        else:
            #Continue with further validation - about 5 other validation checks

        #If record passes all validation then mark mark it as haserror = False
        if self.has_error == False:
            self.set_validation(load_file_detail, load_file_detail_id, False, True)
        else:
            self.has_error = False
        jobs.task_done()    #For the script with threading the job is marked as done. Else this does not appear in the non-threaded script

3.0.用于在线程和非线程中设置验证的常用功能

def set_validation(self, load_file_detail, load_file_detail_id, has_error, can_be_loaded):
    '''Mark the record as having been validated and whether has error = True or False'''
    #print("haserror and canbeloaded ", has_error, can_be_loaded)
    is_done = load_file_detail.set_validation_and_error(load_file_detail_id, True, has_error, can_be_loaded)
    if is_done == False:
        raise Exception ("Can't update load_file_detail's is_validated parameter: ", load_file_detail_id)                   

3.1.用于保存验证状态的实际SQLAlchemy会话

def set_validation_and_error(self, load_file_detail_id, is_validated, has_error, can_be_loaded):
    result = session.execute('UPDATE load_file_detail SET is_validated=%s, has_error=%s, can_be_loaded=%s WHERE id=%s' \
                    %(is_validated, has_error, can_be_loaded, load_file_detail_id))

因此,要验证的数据的获取是相同的,并且到那时为止,两个脚本花费的时间都相同.这两个脚本的验证过程相同,并且保存到数据库的过程也相同,即两个脚本共享3.0和3.1节.唯一的区别是使用多个线程进行验证.因此,我是否在考虑在多线程和SQLAlchemy方面使线程模式下的应用程序运行速度变慢?我是否以正确的方式实现了线程化功能?在这种情况下,其中之一或线程不适合.欢迎提出建议.

So, the fetching of data to be validated is the same and both scripts take same amount of time up to that point. The validation process is the same for both scripts and saving to DB is the same i.e. Section 3.0 and 3.1 are shared by both scripts. The only difference is the validation with multiple threads. So am thinking maybe there is something about the multiple threads and SQLAlchemy that is making the app slower in threaded mode? Have I implemented the threaded function in the proper way? One of those or threading is not suitable in this scenario. Suggestions welcome.

推荐答案

您必须创建用于记录的队列并添加"logger"线程.因此,删除锁定代码必须更快.

You must create Queue for logging and add "logger" thread. So you remove locks code must be faster.

还要在每个线程中创建数据库连接,以便能够并行获取数据.

Also create DB connections in each thread to be able to get data in parallel.

由于GIL,胎面只能并行处理C库调用.

Treads parallelize only C-library calls because of GIL.

要并行化python代码,必须使用多重处理.

For parallelize python code You must use multiprocessing.

我为您编写测试,描述如何处理可迭代:

def produce_data(amount=100000, invalid=1, valid=10): 
# produce_data = sql('request').getall()
    import random
    id = 0
    data = [True]*valid + [False]*invalid
    while id < amount:
        id+=1
        yield (id,random.choice(data))


def validate(row):
    if row[1]:
        time.sleep(0.001) #set valid sql request emulation.
        return True
    else:
        time.sleep(0.001) #set invalid sql request emulation.
        return False



def single():
    for row in produce_data():
        validate(row)

def targeted():
    import threading
    for row in produce_data():
        threading.Thread(target=validate,args=(row,))

Uley = 50

class Bee(object):
        error=False
        running = True
        def __init__(self,queue,*args,**kwargs):
            self.queue=queue #dont use any global variable!
            # every bee must have unique db connection and session.
            #self.session = db.connection().session()
            # initialize it there.
            return super(Bee,self).__init__(*args,**kwargs)

        def run(self):
            while self.running:
                data=self.queue.get()
                if data: 
                    self.error = validate(data) # refactor it to self.validate(data) to be able to get cursor from self.session.
                    self.queue.task_done()
                else:
                    self.queue.task_done()
                    break

            #self.session.commit()                  


def treaded():
    import threading,Queue

    class TreadedBee(Bee,threading.Thread): pass

    q = Queue.Queue()

    for i in range(Uley): #bees started before data was provided.
        bee=TreadedBee(q)
        bee.daemon = True
        bee.start()

    for row in produce_data(): #you dont need to get all data to begin processing, at this place must be cursor of response.
        q.put(row)

    q.join()
    for i in range(Uley):
        q.put(None)


def forked():
    from multiprocessing import Process,JoinableQueue
    class ForkedBee(Bee,Process): pass

    q = JoinableQueue()
    for i in range(Uley):
        bee=ForkedBee(q)
        bee.start()

    for row in produce_data():
        q.put(row)

    q.join()
    #at this you need to kill zomBee -)
    for i in range(Uley):
        q.put(None)
    q.close()

def pool():
    from multiprocessing import Pool
    pool = Pool(processes=Uley)
    pool.map(validate,produce_data())

if __name__ == "__main__":
    import time
    s=time.time()
    single() 
    print(time.time()-s) #109
    s=time.time()
    single() 
    print(time.time()-s) #6
    s=time.time()
    treaded()
    print(time.time()-s) #12
    s=time.time()
    forked()
    print(time.time()-s) #6
    s=time.time()
    pool() 
    print(time.time()-s) #4

测试结果:

$ python2 tgreads.py 
109.779700994
5.84457302094
12.3814198971
5.97618508339
3.69856286049

targeted将淹没CPU,内存,并且您不能提供与DB的单独连接,使用共享连接是不安全的.如果要采用这种方式-您需要提供输出队列并实现收集器,该收集器将与DB进行通信. pool是短代码,速度最快,但不友好,无法启动每个员工的连接.

targeted will flood CPU, memory and you cant provide individual connections to DB, using shared connection is not safe. If want to go in this way - you need to provide output queue and realize collector, that will communicate with DB. pool is short-code and fastest, but not friendly to initiate per-worker connections.

这篇关于此非线程脚本意外地比线程版本运行得快的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆