具有有限CPU/端口的Python多线程处理 [英] Python multi-threaded processing with limited CPU/ports

查看:163
本文介绍了具有有限CPU/端口的Python多线程处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个要以 parallel 处理的文件夹名称字典.在每个文件夹下,有一个我要按系列处理的文件名数组:

I have a dictionary of folder names that I would like to process in parallel. Under each folder, there is an array of file names that I would like to process in series:

folder_file_dict = {
         folder_name : {
                         file_names_key : [file_names_array]
                       }
        }

最终,我将创建一个名为folder_name的文件夹,其中包含名称为len(folder_file_dict[folder_name][file_names_key])的文件.我有这样的方法:

Ultimately, I will be creating a folder named folder_name which contains the files with names len(folder_file_dict[folder_name][file_names_key]). I have a method like so:

def process_files_in_series(file_names_array, udp_port):
    for file_name in file_names_array:
         time_consuming_method(file_name, udp_port)
         # create "file_name"

udp_ports = [123, 456, 789]

请注意上面的time_consuming_method(),由于通过UDP端口进行的调用会花费很长时间.我也仅限于在上面的阵列中使用UDP端口.因此,我必须等待time_consuming_method在UDP端口上完成,然后才能再次使用该UDP端口.这意味着我一次只能运行len(udp_ports)个线程.

Note the time_consuming_method() above, which takes a long time due to calls over a UDP port. I am also limited to using the UDP ports in the array above. Thus, I have to wait for time_consuming_method to complete on a UDP port before I can use that UDP port again. This means that I can only have len(udp_ports) threads running at a time.

因此,我最终将通过对process_files_in_serieslen(folder_file_dict.keys())调用来创建len(folder_file_dict.keys())线程.我也有MAX_THREAD个计数.我正在尝试使用QueueThreading模块,但是我不确定我需要哪种设计.如何使用队列和线程以及可能的条件来做到这一点?使用线程池的解决方案也可能会有所帮助.

Thus, I will ultimately create len(folder_file_dict.keys()) threads, with len(folder_file_dict.keys()) calls to process_files_in_series. I also have a MAX_THREAD count. I am trying to use the Queue and Threading modules, but I am not sure what kind of design I need. How can I do this using Queues and Threads, and possibly Conditions as well? A solution that uses a thread pool may also be helpful.

注意

我并不想提高读取/写入速度.我试图并行化process_files_in_series下对time_consuming_method的调用.创建这些文件只是过程的一部分,而不是限速步骤.

I am not trying to increase the read/write speed. I am trying to parallelize the calls to time_consuming_method under process_files_in_series. Creating these files is just part of the process, but not the rate limiting step.

此外,我正在寻找一种使用QueueThreading和可能的Condition模块或与这些模块相关的东西的解决方案.线程池解决方案也可能会有所帮助.我不能使用进程,只能使用线程.

Also, I am looking for a solution that uses Queue, Threading, and possible Condition modules, or anything relevant to those modules. A threadpool solution may also be helpful. I cannot use processes, only threads.

我还在寻找Python 2.7中的解决方案.

I am also looking for a solution in Python 2.7.

推荐答案

使用线程池:

#!/usr/bin/env python2
from multiprocessing.dummy import Pool, Queue # thread pool

folder_file_dict = {
    folder_name: {
        file_names_key: file_names_array
    }
}

def process_files_in_series(file_names_array, udp_port):
    for file_name in file_names_array:
         time_consuming_method(file_name, udp_port)
         # create "file_name"
         ...

def mp_process(filenames):
    udp_port = free_udp_ports.get() # block until a free udp port is available
    args = filenames, udp_port
    try:
        return args, process_files_in_series(*args), None
    except Exception as e:
        return args, None, str(e)
    finally:
        free_udp_ports.put_nowait(udp_port)

free_udp_ports = Queue() # in general, use initializer to pass it to children
for port in udp_ports:
    free_udp_ports.put_nowait(port)
pool = Pool(number_of_concurrent_jobs) #
for args, result, error in pool.imap_unordered(mp_process, get_files_arrays()):
    if error is not None:
       print args, error

如果不同的文件名数组的处理时间可能不同,我认为不需要将线程数绑定到udp端口数.

I don't think you need to bind number of threads to number of udp ports if the processing time may differ for different filenames arrays.

如果我正确理解了folder_file_dict的结构,则可以生成文件名数组:

If I understand the structure of folder_file_dict correctly then to generate the filenames arrays:

def get_files_arrays(folder_file_dict=folder_file_dict):
    for folder_name_dict in folder_file_dict.itervalues():
        for filenames_array in folder_name_dict.itervalues():
            yield filenames_array

这篇关于具有有限CPU/端口的Python多线程处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆