具有有限 CPU/端口的 Python 多线程处理 [英] Python multi-threaded processing with limited CPU/ports
问题描述
我有一个文件夹名称字典,我想以并行处理.在每个文件夹下,有一个我想以 系列 处理的文件名数组:
I have a dictionary of folder names that I would like to process in parallel. Under each folder, there is an array of file names that I would like to process in series:
folder_file_dict = {
folder_name : {
file_names_key : [file_names_array]
}
}
最终,我将创建一个名为 folder_name
的文件夹,其中包含名称为 len(folder_file_dict[folder_name][file_names_key])
的文件.我有一个这样的方法:
Ultimately, I will be creating a folder named folder_name
which contains the files with names len(folder_file_dict[folder_name][file_names_key])
. I have a method like so:
def process_files_in_series(file_names_array, udp_port):
for file_name in file_names_array:
time_consuming_method(file_name, udp_port)
# create "file_name"
udp_ports = [123, 456, 789]
请注意上面的 time_sumption_method()
,由于通过 UDP 端口进行调用,这需要很长时间.我也仅限于使用上面数组中的 UDP 端口.因此,我必须等待 time_sumption_method
在 UDP 端口上完成,然后才能再次使用该 UDP 端口.这意味着我一次只能运行 len(udp_ports)
线程.
Note the time_consuming_method()
above, which takes a long time due to calls over a UDP port. I am also limited to using the UDP ports in the array above. Thus, I have to wait for time_consuming_method
to complete on a UDP port before I can use that UDP port again. This means that I can only have len(udp_ports)
threads running at a time.
因此,我最终将创建 len(folder_file_dict.keys())
线程,使用 len(folder_file_dict.keys())
调用 process_files_in_series
代码>.我也有一个 MAX_THREAD 计数.我正在尝试使用 Queue
和 Threading
模块,但我不确定我需要什么样的设计.我如何使用队列和线程以及可能的条件来做到这一点?使用线程池的解决方案也可能会有所帮助.
Thus, I will ultimately create len(folder_file_dict.keys())
threads, with len(folder_file_dict.keys())
calls to process_files_in_series
. I also have a MAX_THREAD count. I am trying to use the Queue
and Threading
modules, but I am not sure what kind of design I need. How can I do this using Queues and Threads, and possibly Conditions as well? A solution that uses a thread pool may also be helpful.
注意
我不是想提高读/写速度.我试图在 process_files_in_series
下并行化对 time_sumption_method
的调用.创建这些文件只是过程的一部分,而不是速率限制步骤.
I am not trying to increase the read/write speed. I am trying to parallelize the calls to time_consuming_method
under process_files_in_series
. Creating these files is just part of the process, but not the rate limiting step.
此外,我正在寻找一种使用 Queue
、Threading
和可能的 Condition
模块或与这些模块相关的任何内容的解决方案.线程池解决方案也可能有帮助.我不能使用进程,只能使用线程.
Also, I am looking for a solution that uses Queue
, Threading
, and possible Condition
modules, or anything relevant to those modules. A threadpool solution may also be helpful. I cannot use processes, only threads.
我也在寻找 Python 2.7 的解决方案.
I am also looking for a solution in Python 2.7.
推荐答案
使用线程池:
#!/usr/bin/env python2
from multiprocessing.dummy import Pool, Queue # thread pool
folder_file_dict = {
folder_name: {
file_names_key: file_names_array
}
}
def process_files_in_series(file_names_array, udp_port):
for file_name in file_names_array:
time_consuming_method(file_name, udp_port)
# create "file_name"
...
def mp_process(filenames):
udp_port = free_udp_ports.get() # block until a free udp port is available
args = filenames, udp_port
try:
return args, process_files_in_series(*args), None
except Exception as e:
return args, None, str(e)
finally:
free_udp_ports.put_nowait(udp_port)
free_udp_ports = Queue() # in general, use initializer to pass it to children
for port in udp_ports:
free_udp_ports.put_nowait(port)
pool = Pool(number_of_concurrent_jobs) #
for args, result, error in pool.imap_unordered(mp_process, get_files_arrays()):
if error is not None:
print args, error
如果不同文件名数组的处理时间可能不同,我认为您不需要将线程数绑定到 udp 端口数.
I don't think you need to bind number of threads to number of udp ports if the processing time may differ for different filenames arrays.
如果我正确理解了 folder_file_dict
的结构,然后生成文件名数组:
If I understand the structure of folder_file_dict
correctly then to generate the filenames arrays:
def get_files_arrays(folder_file_dict=folder_file_dict):
for folder_name_dict in folder_file_dict.itervalues():
for filenames_array in folder_name_dict.itervalues():
yield filenames_array
这篇关于具有有限 CPU/端口的 Python 多线程处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!