Python 3-如何正确设置此多处理作业? [英] Python 3 - How to properly setup this multiprocessing job?

查看:59
本文介绍了Python 3-如何正确设置此多处理作业?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含10,000行的文件,每一行代表一个下载作业的参数.我喜欢5个自定义下载器.每个作业可能需要5秒钟到2分钟的时间.我将如何创建遍历10,000行的内容,如果该下载器当前无法正常工作,则将其分配给该下载器?

I have a file with like 10,000 rows, each row represents parameters to a download job. I have like 5 custom downloaders. Each job can take anywhere from 5 seconds to 2 minutes. How would I create something that iterates through the 10,000 rows, assigning each job to a downloader if that downloader isn't currently working?

对我来说困难的部分是每个Downloader是一个类的实例,实例之间的区别是实例化5个Downloader对象中的每一个时我指定的port_numbers.所以我有a = Downloader(port_number=7751) ... e = Downloader(port_number=7755).然后,如果要使用Downloader,我将执行a.run(row).

The difficult part for me is that each Downloader is an instance of a class, and the differences between the instances are the port_numbers I specify when I instantiate each of the 5 Downloader objects. So I have a = Downloader(port_number=7751) ... e = Downloader(port_number=7755). Then, if I were to use a Downloader I would do a.run(row).

我如何将这些工人定义为这些a, b, c, d, e而不是downloader function?

How do I define the workers as these a, b, c, d, e rather than a downloader function?

推荐答案

有很多方法可以做到-最简单的方法就是只使用multiprocessing.Pool并让它为您组织工作人员-1万行不是全部这么多,比方说,一个普通的URL甚至整整一个KB长,它仍然只需要10MB的内存,而内存却很便宜.

There are many ways to do it - the simplest way would be to just use multiprocessing.Pool and let it organize the workers for you - 10k rows is not all that much, let's say that an average URL is even a full kilobyte long it will still take only 10MB of memory and memory is cheap.

因此,只需读取内存中的文件并将其映射到multiprocessing.Pool即可进行出价:

So, just read the file in memory and map it to multiprocessing.Pool to do your bidding:

from multiprocessing import Pool

def downloader(param):  # our downloader process
    # download code here
    # param will hold a line from your file (including newline at the end, strip before use)
    # e.g. res = requests.get(param.strip())
    return True  # lets provide some response back

if __name__ == "__main__":  # important protection for cross-platform use

    with open("your_file.dat", "r") as f:  # open your file
        download_jobs = f.readlines()  # store each line in a list

    download_pool = Pool(processes=5)  # make our pool use 5 processes
    responses = download_pool.map(downloader, download_jobs)  # map our data, line by line
    download_pool.close()  # lets exit cleanly
    # you can check the responses for each line in the `responses` list

如果需要共享内存,也可以使用threading代替multiprocessing(或multiprocessing.pool.ThreadPool的替代品)在单个进程内完成所有操作.除非您正在执行其他处理,否则单线程足以满足下载目的.

You can also use threading instead of multiprocessing (or multiprocessing.pool.ThreadPool as a drop-in replacement for this) to do everything within a single process if you need shared memory. A single thread is more than enough for download purposes unless you're doing additional processing.

如果希望下载程序作为类实例运行,则可以将downloader函数转换为Downloader实例的工厂,然后只需传递将实例化所需的实例以及URL即可.这是一种简单的循环方法:

If you want your downloaders to run as class instances, you can transform the downloader function into a factory for your Downloader instances, and then just pass what you need to instantiate those instances alongside your URLs. Here is a simple Round-Robin approach:

from itertools import cycle
from multiprocessing import Pool

class Downloader(object):

    def __init__(self, port_number=8080):
        self.port_number = port_number

    def run(self, url):
        print("Downloading {} on port {}".format(url, self.port_number))

def init_downloader(params):  # our downloader initializator
    downloader = Downloader(**params[0])  # instantiate our downloader
    downloader.run(params[1])  # run our downloader
    return True  # you can provide your

if __name__ == "__main__":  # important protection for cross-platform use

    downloader_params = [  # Downloaders will be initialized using these params
        {"port_number": 7751},
        {"port_number": 7851},
        {"port_number": 7951}
    ]

    downloader_cycle = cycle(downloader_params)  # use cycle for round-robin distribution
    with open("your_file.dat", "r") as f:  # open your file
        # read our file line by line and attach downloader params to it
        download_jobs = [[next(downloader_cycle), row.strip()] for row in f]

    download_pool = Pool(processes=5)  # make our pool use 5 processes
    responses = download_pool.map(init_downloader, download_jobs)  # map our data
    download_pool.close()  # lets exit cleanly
    # you can check the responses for each line in the `responses` list

请记住,这不是最平衡的解决方案,因为它可能碰巧有两个运行相同端口的Downloader实例,但是会平均足够大的数据.

Keep in mind that this is not the most balanced solution as it can happen to have two Downloader instances with the same port running, but it will average over large enough data.

如果要确保没有两个Downloader实例在同一端口上运行,则要么需要构建自己的池,要么需要创建一个中央进程来在需要时将端口分配给您的Downloader实例.

If you want to make sure that you don't have two Downloader instances running off of the same port, you'll either need to build your own pool, or you'll need to create a central process that will issue ports to your Downloader instances when they need them.

这篇关于Python 3-如何正确设置此多处理作业?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆