如何在pyspark中并行下载大量URL? [英] How do I download a large list of URLs in parallel in pyspark?

查看:240
本文介绍了如何在pyspark中并行下载大量URL?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含10000个网址的RDD.

I have an RDD containing 10000 urls to be fetched.

list = ['http://SDFKHSKHGKLHSKLJHGSDFKSJH.com',
        'http://google.com',
        'http://twitter.com']
urls = sc.parallelize(list)

我需要检查哪些URL损坏,并且最好将结果提取到Python中相应的RDD中.我试过了:

I need to check which urls are broken and preferably fetch the results to a corresponding RDD in Python. I tried this:

import asyncio
import concurrent.futures
import requests

async def get(url):

    with concurrent.futures.ThreadPoolExecutor() as executor:

        loop = asyncio.get_event_loop()
        futures = [
            loop.run_in_executor(
                executor, 
                requests.get, 
                i
            )
            for i in url
        ]
        return futures

async def get_response(futures):
    response = await asyncio.gather(futures,return_exceptions=True)
    return(response)

tasks = urls.map(lambda query: get(query)) # Method returns http call response as a Future[String]

results = tasks.map(lambda task: get_response(task) )
results = results.map(lambda response:'ERR' if isinstance(response, Exception) else 'OK' )
results.collect()

我得到以下输出,显然是不正确的:

I get the following output which obviously is not right:

['OK', 'OK', 'OK']

我也尝试过:

import asyncio
import concurrent.futures
import requests

async def get():

    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

        loop = asyncio.get_event_loop()
        futures = [
            loop.run_in_executor(
                executor, 
                requests.get, 
                i
            )
            for i in urls.toLocalIterator()
        ]
        for response in await asyncio.gather(*futures,return_exceptions=True):
            print('{}: {}'.format(response, 'ERR' if isinstance(response, Exception) else 'OK'))
            pass


loop = asyncio.get_event_loop()
loop.run_until_complete(get())

我得到以下输出:

HTTPConnectionPool(host='SDFKHSKHGKLHSKLJHGSDFKSJH.com', port=80): Max retries exceeded with url: / (Caused by 
NewConnectionError('<urllib3.connection.HTTPConnection object at 0x12c834210>: Failed to establish a new connection: [Errno 8] nodename nor servname provided, or not known')): ERR
<Response [200]>: OK
<Response [200]>: OK

所需的输出将如下所示:

Desired output would be something like this:

http://SDFKHSKHGKLHSKLJHGSDFKSJH.com : ERR
http://google.com : OK
http://twitter.com : OK

但是第二种方法的问题是使用列表存储将来的对象.我相信使用RDD会更好,因为URL的数量可以达到数百万或数十亿,而且没有机器可以处理它.我也不清楚如何从响应中检索URL.

But the problem with the second approach is the use of lists to store future objects. I believe that using RDD is better, since number of urls can be in millions or billions and no singe machine can handle it. Also it is not clear to me how to retrieve urls from responses.

推荐答案

如果您使用的是concurrent.futures,则根本不需要asyncio(因为无论如何您都在多个线程中运行,因此不会带来任何好处) .您可以使用concurrent.futures.wait()并行等待多个期货.

If you're using concurrent.futures, you don't need asyncio at all (it will bring you no benefits since you are running in multiple threads anyway). You can use concurrent.futures.wait() to wait for multiple futures in parallel.

我无法测试您的数据,但是它应该可以与以下代码一起使用:

I can't test your data, but it should work with code like this:

import concurrent.futures, requests

def get_one(url):
    resp = requests.get(url)
    resp.raise_for_status()
    return resp.text

def get_all():
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        futures = [executor.submit(get_one, url)
                   for url in urls.toLocalIterator()]
    # the end of the "with" block will automatically wait
    # for all of the executor's tasks to complete

    for fut in futures:
        if fut.exception() is not None:
            print('{}: {}'.format(fut.exception(), 'ERR')
        else:
            print('{}: {}'.format(fut.result(), 'OK')

要对asyncio执行相同的操作,应改用 aiohttp .

To do the same thing with asyncio, you should use aiohttp instead.

这篇关于如何在pyspark中并行下载大量URL?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆