Python线程.仅可以使用私有方法self .__ Thread_stop()停止线程. [英] Python threading.Thread can be stopped only with private method self.__Thread_stop()
问题描述
我有一个函数,该函数接受x,y对的大数组作为输入,该函数使用numpy和scipy进行一些复杂的曲线拟合,然后返回一个值.为了加快速度,我尝试使用Queue.Queue将数据馈入两个线程.一旦数据完成.我试图使线程终止,然后结束调用过程,并将控制权返回给外壳.
我试图了解为什么我必须在threading.Thread中使用私有方法来停止我的线程并将控制权返回给命令行.
self.join()不会结束程序.恢复控制的唯一方法是使用私有停止方法.
def stop(self):
print "STOP CALLED"
self.finished.set()
print "SET DONE"
# self.join(timeout=None) does not work
self._Thread__stop()
这是我的代码的近似值:
class CalcThread(threading.Thread):
def __init__(self,in_queue,out_queue,function):
threading.Thread.__init__(self)
self.in_queue = in_queue
self.out_queue = out_queue
self.function = function
self.finished = threading.Event()
def stop(self):
print "STOP CALLED"
self.finished.set()
print "SET DONE"
self._Thread__stop()
def run(self):
while not self.finished.isSet():
params_for_function = self.in_queue.get()
try:
tm = self.function(paramsforfunction)
self.in_queue.task_done()
self.out_queue.put(tm)
except ValueError as v:
#modify params and reinsert into queue
window = params_for_function["window"]
params_for_function["window"] = window + 1
self.in_queue.put(params_for_function)
def big_calculation(well_id,window,data_arrays):
# do some analysis to calculate tm
return tm
if __name__ == "__main__":
NUM_THREADS = 2
workers = []
in_queue = Queue()
out_queue = Queue()
for i in range(NUM_THREADS):
w = CalcThread(in_queue,out_queue,big_calculation)
w.start()
workers.append(w)
if options.analyze_all:
for i in well_ids:
in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict))
in_queue.join()
print "ALL THREADS SEEM TO BE DONE"
# gather data and report it from out_queue
for i in well_ids:
p = out_queue.get()
print p
out_queue.task_done()
# I had to do this to get the out_queue to proceed
if out_queue.qsize() == 0:
out_queue.join()
break
# Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method
for aworker in workers:
aworker.stop()
通常,杀死一个修改共享资源的线程是一个坏主意.
除非在执行计算时释放GIL,否则多个线程中的CPU密集型任务要比Python中的无用任务更糟糕.许多numpy
函数 do 会释放GIL.
ThreadPoolExecutor示例来自文档
import concurrent.futures # on Python 2.x: pip install futures
calc_args = []
if options.analyze_all:
calc_args.extend(dict(well_id=i,...) for i in well_ids)
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
future_to_args = dict((executor.submit(big_calculation, args), args)
for args in calc_args)
while future_to_args:
for future in concurrent.futures.as_completed(dict(**future_to_args)):
args = future_to_args.pop(future)
if future.exception() is not None:
print('%r generated an exception: %s' % (args,
future.exception()))
if isinstance(future.exception(), ValueError):
#modify params and resubmit
args["window"] += 1
future_to_args[executor.submit(big_calculation, args)] = args
else:
print('f%r returned %r' % (args, future.result()))
print("ALL work SEEMs TO BE DONE")
如果没有共享状态,则可以将ThreadPoolExecutor
替换为ProcessPoolExecutor
.将代码放入您的main()
函数.
I have a function that accepts a large array of x,y pairs as an input which does some elaborate curve fitting using numpy and scipy and then returns a single value. To try and speed things up I am trying to have two threads that I feed the data to using Queue.Queue . Once the data is done. I am trying to have the threads terminate and then end the calling process and return control to the shell.
I am trying to understand why I have to resort to a private method in threading.Thread to stop my threads and return control to the commandline.
The self.join() does not end the program. The only way to get back control was to use the private stop method.
def stop(self):
print "STOP CALLED"
self.finished.set()
print "SET DONE"
# self.join(timeout=None) does not work
self._Thread__stop()
Here is an approximation of my code:
class CalcThread(threading.Thread):
def __init__(self,in_queue,out_queue,function):
threading.Thread.__init__(self)
self.in_queue = in_queue
self.out_queue = out_queue
self.function = function
self.finished = threading.Event()
def stop(self):
print "STOP CALLED"
self.finished.set()
print "SET DONE"
self._Thread__stop()
def run(self):
while not self.finished.isSet():
params_for_function = self.in_queue.get()
try:
tm = self.function(paramsforfunction)
self.in_queue.task_done()
self.out_queue.put(tm)
except ValueError as v:
#modify params and reinsert into queue
window = params_for_function["window"]
params_for_function["window"] = window + 1
self.in_queue.put(params_for_function)
def big_calculation(well_id,window,data_arrays):
# do some analysis to calculate tm
return tm
if __name__ == "__main__":
NUM_THREADS = 2
workers = []
in_queue = Queue()
out_queue = Queue()
for i in range(NUM_THREADS):
w = CalcThread(in_queue,out_queue,big_calculation)
w.start()
workers.append(w)
if options.analyze_all:
for i in well_ids:
in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict))
in_queue.join()
print "ALL THREADS SEEM TO BE DONE"
# gather data and report it from out_queue
for i in well_ids:
p = out_queue.get()
print p
out_queue.task_done()
# I had to do this to get the out_queue to proceed
if out_queue.qsize() == 0:
out_queue.join()
break
# Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method
for aworker in workers:
aworker.stop()
In general it is a bad idea to kill a thread that modifies shared resource.
CPU intensive tasks in multiple threads are worse than useless in Python unless you release GIL while performing computations. Many numpy
functions do release GIL.
ThreadPoolExecutor example from the docs
import concurrent.futures # on Python 2.x: pip install futures
calc_args = []
if options.analyze_all:
calc_args.extend(dict(well_id=i,...) for i in well_ids)
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
future_to_args = dict((executor.submit(big_calculation, args), args)
for args in calc_args)
while future_to_args:
for future in concurrent.futures.as_completed(dict(**future_to_args)):
args = future_to_args.pop(future)
if future.exception() is not None:
print('%r generated an exception: %s' % (args,
future.exception()))
if isinstance(future.exception(), ValueError):
#modify params and resubmit
args["window"] += 1
future_to_args[executor.submit(big_calculation, args)] = args
else:
print('f%r returned %r' % (args, future.result()))
print("ALL work SEEMs TO BE DONE")
You could replace ThreadPoolExecutor
by ProcessPoolExecutor
if there is no shared state. Put the code in your main()
function.
这篇关于Python线程.仅可以使用私有方法self .__ Thread_stop()停止线程.的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!