如何在请求模块中使用多处理? [英] How to use multiprocessing with requests module?

查看:59
本文介绍了如何在请求模块中使用多处理?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Python 的新开发者.我的代码是下面的代码:

I am new dev in python. My code is code below:

import warnings
import requests
import multiprocessing

from colorama import init
init(autoreset=True)

from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter("ignore", UserWarning)
warnings.simplefilter('ignore', InsecureRequestWarning)

from bs4 import BeautifulSoup as BS

headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}


class Worker(multiprocessing.Process):

    def run(self):
        with open('ips.txt', 'r') as urls:
            for url in urls.readlines():
                req = url.strip()
                try:
                    page = requests.get(req, headers=headers, verify=False, allow_redirects=False, stream=True,
                                        timeout=10)
                    soup = BS(page.text)
                    # string = string.encode('ascii', 'ignore')
                    print('\033[32m' + req + ' - Title: ', soup.title)
                except requests.RequestException as e:
                    print('\033[32m' + req + ' - TimeOut!')
        return


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()

我试图让程序读取 IPs.txt 并打印出每个网站的标题.

I am trying to make the program read IPs.txt and print out the title of each website.

它在单线程中完美运行.现在我想通过使用 multiprocessing 让它更快.

It works flawlessly in a single thread. Now I want to make it faster by using multiprocessing.

但由于某种原因,它只是输出了 5 次相同的行.我是多处理方面的新手,并在失败的尝试中尽了最大努力.

But it just outputs the same line 5 times for some reason. I am new with multiprocessing and tried my best with failed attempts.

显示问题的屏幕截图:

我只想运行 5 个工作器来检查多线程或并行中的 IPs.txt...我只是想让它更快.

I just want to run 5 workers to check the IPs.txt in multithreading or parallel...I just want to make it faster.

任何提示、线索、帮助?

Any hint, clue, help?

推荐答案

Issue

代码中的主要问题是每个 Worker 从头开始​​打开 ​​ips.txt 并处理 ips.txt 中的每个 URL.因此,五个工作人员一起打开 ips.txt 五次并对每个 URL 工作五次.

Issue

The primary issue in your code is that each Worker opens ips.txt from scratch and works on each URL found in ips.txt. Thus the five workers together open ips.txt five times and work on each URL five times.

解决这个问题的正确方法是将代码拆分为masterworker.您已经实现了大部分工作代码.让我们暂时将主要部分(在 if __name__ == '__main__': 下)视为主部分.

The right way to solve this problem is to split the code into master and worker. You already have most of the worker code implemented. Let us treat the main section (under if __name__ == '__main__':) as the master for now.

现在 master 应该启动 5 个 worker 并通过队列 (multiprocessing.Queue) 向他们发送工作.

Now the master is supposed to launch five workers and send work to them via a queue (multiprocessing.Queue).

multiprocessing.Queue 类为多个生产者将数据放入其中和多个消费者从中读取数据提供了一种方法,而不会遇到竞争条件.此类实现了所有必要的锁定语义,以便在多处理上下文中安全地交换数据并防止竞争条件.

The multiprocessing.Queue class offers a way for multiple producers to put data into it and multiple consumers to read data from it without running into race conditions. This class implements all the necessary locking semantics to exchange data safely in a multiprocessing context and prevent race conditions.

以下是如何按照我上面描述的方式重写您的代码:

Here is how your code could be rewritten as per what I've described above:

import warnings
import requests
import multiprocessing

from colorama import init
init(autoreset=True)

from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter("ignore", UserWarning)
warnings.simplefilter('ignore', InsecureRequestWarning)

from bs4 import BeautifulSoup as BS

headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}


class Worker(multiprocessing.Process):

    def __init__(self, job_queue):
        super().__init__()
        self._job_queue = job_queue

    def run(self):
        while True:
            url = self._job_queue.get()
            if url is None:
                break

            req = url.strip()

            try:
                page = requests.get(req, headers=headers, verify=False, allow_redirects=False, stream=True,
                                    timeout=10)
                soup = BS(page.text)
                # string = string.encode('ascii', 'ignore')
                print('\033[32m' + req + ' - Title: ', soup.title)
            except requests.RequestException as e:
                print('\033[32m' + req + ' - TimeOut!')


if __name__ == '__main__':
    jobs = []
    job_queue = multiprocessing.Queue()

    for i in range(5):
        p = Worker(job_queue)
        jobs.append(p)
        p.start()

    # This is the master code that feeds URLs into queue.
    with open('ips.txt', 'r') as urls:
        for url in urls.readlines():
            job_queue.put(url)

    # Send None for each worker to check and quit.
    for j in jobs:
        job_queue.put(None)

    for j in jobs:
        j.join()

我们在上面的代码中可以看到,master打开ips.txt一次,从里面一一读取URL,放入队列.每个工作人员等待 URL 到达此队列.一旦 URL 到达队列中,其中一名工作人员就会拿起它并变得忙碌.如果队列中有更多 URL,下一个空闲工作人员会选择下一个,依此类推.

We can see in the above code that the master opens ips.txt once, reads the URLs from it one by one and puts them into the queue. Each worker waits for a URL to arrive on this queue. As soon as a URL arrives on the queue, one of the workers picks it up and gets busy. If there are more URLs in the queue, the next free worker picks the next one up and so on.

最后,我们需要一些方法让工人在所有工作完成后退出.有几种方法可以实现这一点.在这个例子中,我选择了一个简单的策略,将五个哨兵值(在本例中为五个 None 值)发送到队列中,每个工作人员一个,这样每个工作人员都可以选择并退出.

Finally, we need some way for the workers to quit when all work is done. There are several ways to achieve this. In this example, I have chosen a simple strategy of sending five sentinel values (five None values in this case) into the queue, one for each worker, so that each worker can pick this up and quit.

还有另一种策略,worker 和 master 共享一个 multiprocessing.Event 对象,就像他们现在共享一个 multiprocessing.Queue 对象一样.只要主节点希望工作人员退出,它就会调用此对象的 set() 方法.工作人员检查此对象是否 is_set() 并退出.但是,这会给代码带来一些额外的复杂性.我在下面讨论过这个问题.

There is another strategy where the workers and the master share a multiprocessing.Event object just like they share a multiprocessing.Queue object right now. The master invokes the set() method of this object whenever it wants the workers to quit. The workers check if this object is_set() and quit. However, this introduces some additional complexity into the code. I've discussed this below.

为了完整起见,也为了演示最少、完整和可验证的示例,我在下面展示了两个代码示例,它们显示了两种停止策略.

For the sake of completeness and also for the sake of demonstrating minimal, complete, and verifiable examples, I am presenting two code examples below that show both stopping strategies.

到目前为止,这几乎就是我上面描述的内容,只是代码示例已被大量简化,以消除对 Python 标准库之外的任何库的依赖.

This is pretty much what I have described above so far except that the code example has been simplified a lot to remove dependencies on any libraries outside the Python standard library.

在下面的示例中值得注意的另一件事是,我们使用一个工作函数并从中创建一个 Process,而不是创建一个工作类.这种类型的代码经常出现在 Python 文档中,而且非常地道.

Another thing worth noting in the example below is that instead of creating a worker class, we use a worker function and create a Process out of it. This type of code is often found in the Python documentation and it is quite idiomatic.

import multiprocessing
import time
import random


def worker(input_queue):
    while True:
        url = input_queue.get()

        if url is None:
            break

        print('Started working on:', url)

        # Random delay to simulate fake processing.
        time.sleep(random.randint(1, 3))

        print('Stopped working on:', url)


def master():
    urls = [
        'https://example.com/',
        'https://example.org/',
        'https://example.net/',
        'https://stackoverflow.com/',
        'https://www.python.org/',
        'https://github.com/',
        'https://susam.in/',
    ]

    input_queue = multiprocessing.Queue()
    workers = []

    # Create workers.
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(input_queue, ))
        workers.append(p)
        p.start()

    # Distribute work.
    for url in urls:
        input_queue.put(url)

    # Ask the workers to quit.
    for w in workers:
        input_queue.put(None)

    # Wait for workers to quit.
    for w in workers:
        w.join()

    print('Done')


if __name__ == '__main__':
    master()

使用事件停止工作

使用 multiprocessing.Event 对象来通知工作人员何时应该退出会在代码中引入一些复杂性.主要需要进行三项更改:

Using Event to Stop Workers

Using an multiprocessing.Event object to signal when workers should quit introduces some complexity in the code. There are primarily three changes that have to be made:

  • 在 master 中,我们调用 Event 对象上的 set() 方法来发出信号,通知工作人员应尽快退出.
  • 在工作进程中,我们定期调用 Event 对象的 is_set() 方法来检查它是否应该退出.
  • 在 master 中,我们需要使用 multiprocessing.JoinableQueue 而不是 multiprocessing.Queue 以便它可以测试队列是否已被之前的 worker 完全消耗掉要求工人辞职.
  • 在工作线程中,我们需要在队列中的每个项目都被消费后调用队列的 task_done() 方法.这对于 master 调用队列的 join() 方法来测试它是否已被清空是必要的.
  • In the master, we invoke the set() method on the Event object to signal that workers should quit as soon as possible.
  • In the worker, we invoke the is_set() method of the Event object periodically to check if it should quit.
  • In the master, we need to use multiprocessing.JoinableQueue instead of multiprocessing.Queue so that it can test if the queue has been consumed completely by the workers before it asks the workers to quit.
  • In the worker, we need to invoke the task_done() method of the queue after every item from the queue is consumed. This is necessary for the master to invoke the join() method of the queue to test if it has been emptied.

所有这些变化都可以在下面的代码中找到:

All of these changes can be found in the code below:

import multiprocessing
import time
import random
import queue


def worker(input_queue, stop_event):
    while not stop_event.is_set():
        try:
            # Check if any URL has arrived in the input queue. If not,
            # loop back and try again.
            url = input_queue.get(True, 1)
            input_queue.task_done()
        except queue.Empty:
            continue

        print('Started working on:', url)

        # Random delay to simulate fake processing.
        time.sleep(random.randint(1, 3))

        print('Stopped working on:', url)


def master():
    urls = [
        'https://example.com/',
        'https://example.org/',
        'https://example.net/',
        'https://stackoverflow.com/',
        'https://www.python.org/',
        'https://github.com/',
        'https://susam.in/',
    ]

    input_queue = multiprocessing.JoinableQueue()
    stop_event = multiprocessing.Event()

    workers = []

    # Create workers.
    for i in range(5):
        p = multiprocessing.Process(target=worker,
                                    args=(input_queue, stop_event))
        workers.append(p)
        p.start()

    # Distribute work.
    for url in urls:
        input_queue.put(url)

    # Wait for the queue to be consumed.
    input_queue.join()

    # Ask the workers to quit.
    stop_event.set()

    # Wait for workers to quit.
    for w in workers:
        w.join()

    print('Done')


if __name__ == '__main__':
    master()

这篇关于如何在请求模块中使用多处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆