尝试将文件下载缓冲区拆分为单独的线程 [英] trying to split the file download buffer to into separate threads

查看:25
本文介绍了尝试将文件下载缓冲区拆分为单独的线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将文件缓冲区下载到 5 个线程中,但似乎出现了乱码.

I am trying to download the buffer of file into 5 threads but it seems like it's getting garbled.

from numpy import arange
import requests
from threading import Thread
import urllib2

url = 'http://pymotw.com/2/urllib/index.html'
sizeInBytes = r = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers['content-length']

splitBy = 5

splits = arange(splitBy + 1) * (float(sizeInBytes)/splitBy)

dataLst = []

def bufferSplit(url, idx, splits):
    req = urllib2.Request(url,  headers={'Range': 'bytes=%d-%d' % (splits[idx], splits[idx+1])})
    print {'bytes=%d-%d' % (splits[idx], splits[idx+1])}
    dataLst.append(urllib2.urlopen(req).read())


for idx in range(splitBy):
    dlth = Thread(target=bufferSplit, args=(url, idx, splits))
    dlth.start()


print dataLst

with open('page.html', 'w') as fh:
    fh.write(''.join(dataLst))

更新:所以我努力工作,除了进步之外几乎没有得到什么,但是如果我下载一个 jpg 它似乎已损坏;

Update: So I worked over and got little but progress, however if I download a jpg it seems to be corrupted;

from numpy import arange
import os
import requests
import threading
import urllib2

# url ='http://s1.fans.ge/mp3/201109/08/John_Legend_So_High_Remix(fans_ge).mp3'
url = "http://www.nasa.gov/images/content/607800main_kepler1200_1600-1200.jpg"
# url = 'http://pymotw.com/2/urllib/index.html'
sizeInBytes = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None)


splitBy = 5

dataLst = []


class ThreadedFetch(threading.Thread):
    """ docstring for ThreadedFetch
    """
    def __init__(self, url, fileName, splitBy=5):
        super(ThreadedFetch, self).__init__()
        self.__url = url
        self.__spl = splitBy
        self.__dataLst = []
        self.__fileName = fileName

    def run(self):
        if not sizeInBytes:
            print "Size cannot be determined."
            return
        splits = arange(self.__spl + 1) * (float(sizeInBytes)/self.__spl)
        for idx in range(self.__spl):
            req = urllib2.Request(self.__url,  headers={'Range': 'bytes=%d-%d' % (splits[idx], splits[idx+1])})
            self.__dataLst.append(urllib2.urlopen(req).read())


    def getFileData(self):
        return ''.join(self.__dataLst)


fileName = url.split('/')[-1]

dl = ThreadedFetch(url, fileName)
dl.start()
dl.join()
content = dl.getFileData()
if content:
    with open(fileName, 'w') as fh:
        fh.write(content)
    print "Finished Writing file %s" % fileName

下面是下载后的图片.

推荐答案

这是该项目的另一个版本.差异:

Here's another version of the project. Differences:

  • 线程代码是一个单独的小函数

  • thread code is a single small function

每个线程下载一个块,然后将其存储在一个全局线程安全字典中

each thread downloads a chunk, then stores it in a global threadsafe dictionary

线程启动,然后join()ed -- 它们都同时运行

threads are started, then join()ed -- they're all running at once

完成后,数据按正确顺序重新组合,然后写入磁盘

when all done, data is reassembled in correct order then written to disk

额外打印,以验证一切正确

extra printing, to verify everything's correct

计算输出文件大小,用于额外比较

output file size is calculated, for an extra comparison

import os, requests
import threading
import urllib2
import time

URL = "http://www.nasa.gov/images/content/607800main_kepler1200_1600-1200.jpg"

def buildRange(value, numsplits):
    lst = []
    for i in range(numsplits):
        if i == 0:
            lst.append('%s-%s' % (i, int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
        else:
            lst.append('%s-%s' % (int(round(1 + i * value/(numsplits*1.0),0)), int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
    return lst

def main(url=None, splitBy=3):
    start_time = time.time()
    if not url:
        print "Please Enter some url to begin download."
        return

    fileName = url.split('/')[-1]
    sizeInBytes = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None)
    print "%s bytes to download." % sizeInBytes
    if not sizeInBytes:
        print "Size cannot be determined."
        return

    dataDict = {}

    # split total num bytes into ranges
    ranges = buildRange(int(sizeInBytes), splitBy)

    def downloadChunk(idx, irange):
        req = urllib2.Request(url)
        req.headers['Range'] = 'bytes={}'.format(irange)
        dataDict[idx] = urllib2.urlopen(req).read()

    # create one downloading thread per chunk
    downloaders = [
        threading.Thread(
            target=downloadChunk, 
            args=(idx, irange),
        )
        for idx,irange in enumerate(ranges)
        ]

    # start threads, let run in parallel, wait for all to finish
    for th in downloaders:
        th.start()
    for th in downloaders:
        th.join()

    print 'done: got {} chunks, total {} bytes'.format(
        len(dataDict), sum( (
            len(chunk) for chunk in dataDict.values()
        ) )
    )

    print "--- %s seconds ---" % str(time.time() - start_time)

    if os.path.exists(fileName):
        os.remove(fileName)
    # reassemble file in correct order
    with open(fileName, 'w') as fh:
        for _idx,chunk in sorted(dataDict.iteritems()):
            fh.write(chunk)

    print "Finished Writing file %s" % fileName
    print 'file size {} bytes'.format(os.path.getsize(fileName))

if __name__ == '__main__':
    main(URL)

输出

102331 bytes to download.
done: got 3 chunks, total 102331 bytes
--- 0.380599021912 seconds ---
Finished Writing file 607800main_kepler1200_1600-1200.jpg
file size 102331 bytes

这篇关于尝试将文件下载缓冲区拆分为单独的线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆