使用多个连接/线程将大文件上传到带有 Python Paramiko 的 SFTP 服务器 [英] Upload large file using multiple connections/threads to an SFTP server with Python Paramiko

查看:88
本文介绍了使用多个连接/线程将大文件上传到带有 Python Paramiko 的 SFTP 服务器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用线程和 python paramiko 库将文件以块的形式通过 SFTP 传输到远程服务器.

I am trying to SFTP a file to a remote server in chunks using threads and the python paramiko library.

它在不同的线程中打开一个本地文件和 sftp 块到远程服务器.

It opens a local file and sftp chunks to the remote server in different threads.

我基本上遵循这个解决方案,它使用相同的方法通过 SFTP 下载大文件.我想发送大文件.下载解决方案

I am basically following this solution which uses the same approach to download large file over SFTP. I would like to send large files instead. Downloading solution

但是,我在chunk infile.readv(chunks):的行上输入了write_chunks(),得到这个错误:

However, I'm getting in write_chunks() on the line for chunk in infile.readv(chunks): in getting this error:

AttributeError: '_io.BufferedReader' 对象没有属性 'readv'

AttributeError: '_io.BufferedReader' object has no attribute 'readv'

有人可以帮忙解决这个错误吗?我认为 infile 是一个文件描述符.我不明白为什么它是一个 _io.BufferedReader 对象.

Could anybody assist with this error please. I thought that infile is a file descriptor. I don't understand why it is an _io.BufferedReader object.

import threading, os, time, paramiko

import time, paramiko

MAX_RETRIES = 10

ftp_server = "server.com"
port = 22
remote_file = "/home/filecopy.bin"
local_file = "/home/file.bin"
ssh_conn = sftp_client = None
username = "none"
password = "none"

#you could make the number of threads relative to file size
NUM_THREADS = 2
MAX_RETRIES = 10

def make_filepart_path(file_path, part_number):
    """creates filepart path from filepath"""
    return "%s.filepart.%s" % (file_path, part_number+1)

def write_chunks(chunks, tnum, remote_file_part, username, password, ftp_server, max_retries):
    ssh_conn = sftp_client = None
    for retry in range(max_retries):
        try:
            ssh_conn = paramiko.Transport((ftp_server, port))
            ssh_conn.connect(username=username, password=password)
            sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
            with sftp_client.open(remote_file_part, "wb") as outfile:
                with open(local_file, "rb") as infile:
                    for chunk in infile.readv(chunks):
                        outfile.write(chunk)
            break
        except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
            retry += 1
            print("%s %s Thread %s - > retrying %s..." % (type(x), x, tnum, retry))
            time.sleep(abs(retry) * 10)
        finally:
            if hasattr(sftp_client, "close") and callable(sftp_client.close):
                sftp_client.close()
            if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
                ssh_conn.close()



start_time = time.time()

for retry in range(MAX_RETRIES):
    try:
        ssh_conn = paramiko.Transport((ftp_server, port))
        ssh_conn.connect(username=username, password=password)
        sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
        # connect to get the file's size in order to calculate chunks
        #filesize = sftp_client.stat(remote_file).st_size
        filesize = os.stat(local_file).st_size
        sftp_client.close()
        ssh_conn.close()
        chunksize = pow(2, 12)
        chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)]
        thread_chunk_size = (len(chunks) // NUM_THREADS) + 1
        # break the chunks into sub lists to hand off to threads
        thread_chunks = [chunks[i:i+thread_chunk_size] for i in range(0, len(chunks) - 1, thread_chunk_size)]
        threads = []
        fileparts = []
        for thread_num in range(len(thread_chunks)):
            remote_file_part = make_filepart_path(remote_file, thread_num) 
            args = (thread_chunks[thread_num], thread_num, remote_file_part, username, password, ftp_server, MAX_RETRIES)
            threads.append(threading.Thread(target=write_chunks, args=args))
            fileparts.append(remote_file_part)
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
        # join file parts into one file, remove fileparts
        with sftp_client.open(remote_file_part, "wb") as outfile:
            for filepart in fileparts:
                with open(filepart, "rb") as infile:
                    outfile.write(infile.read())
                os.remove(filepart)
        break
    except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
        retry += 1
        print("%s %s - > retrying %s..." % (type(x), x, retry))
        time.sleep(abs(retry) * 10)
    finally:
       if hasattr(sftp_client, "close") and callable(sftp_client.close):
           sftp_client.close()
       if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
           ssh_conn.close()


print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))

堆栈跟踪:

Exception in thread Thread-4:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "simpleNNInference.py", line 210, in write_chunks
    for chunk in infile.readv(chunks):
AttributeError: '_io.BufferedReader' object has no attribute 'readv'

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "simpleNNInference.py", line 210, in write_chunks
    for chunk in infile.readv(chunks):
AttributeError: '_io.BufferedReader' object has no attribute 'readv'

推荐答案

关于一个大文件并行分段上传的例子,请看下面的例子.

For an example how to do a parallel multi part upload of one large file, see the following example.

请注意,大多数 SFTP 服务器(包括 OpenSSH)不允许远程合并文件.所以你必须为此恢复到 shell 命令.

Note that most SFTP servers (including OpenSSH) do not allow merging files remotely. So you have to revert to shell command for that.

import os
import threading
import paramiko

sftp_server = "example.com"
username = "username"
password = "password"

local_path = "/local/path/file.dat"
remote_path = "/remote/path/file.dat"

threads_count = 4

size = os.path.getsize(local_path)
part_size = int(size / threads_count)

def open_ssh():
    ssh = paramiko.SSHClient()
    ssh.connect(sftp_server, username=username, password=password)
    return ssh

def upload_part(num, offset, part_size, remote_path_part):
    print(f"Running thread {num}")
    try:
        ssh = open_ssh()
        sftp = ssh.open_sftp()
        with open(local_path, "rb") as fl:
            fl.seek(offset)
            with sftp.open(remote_path_part, "wb") as fr:
                fr.set_pipelined(True)
                size = 0
                while size < part_size:
                    s = 32768
                    if size + s > part_size:
                        s = part_size - size
                    data = fl.read(s)
                    fr.write(data)
                    size += len(data)
                    if len(data) == 0:
                        break
    except (paramiko.ssh_exception.SSHException) as x:
        print(f"Thread {num} failed: {x}")
    print(f"Thread {num} done")

print("Starting")
offset = 0
threads = []
part_filenames = []
for num in range(threads_count):
    if num == threads_count - 1:
        part_size = size - offset
    remote_path_part = f"{remote_path}.{num}"
    args = (num, offset, part_size, remote_path_part)
    print(f"Starting thread {num} offset {offset} size {part_size} " +
          f"part name {remote_path_part}")
    thread = threading.Thread(target=upload_part, args=args)
    threads.append(thread)
    part_filenames.append(remote_path_part)
    thread.start()
    print(f"Started thread {num}")
    offset += part_size

for num in range(len(threads)):
    print(f"Waiting for thread {num}")
    threads[num].join()

print("All thread done")

parts_list = " ".join(part_filenames)
merge_command =
    f"rm \"{remote_path}\" 2> /dev/null ; " + \
    f"for i in {parts_list} ; do cat \"$i\" >> {remote_path} && " + \
     "rm \"$i\" || break ; done"
print(f"Merge command: {merge_command}");

ssh = open_ssh()
stdin, stdout, stderr = ssh.exec_command(merge_command)
print(stdout.read().decode("utf-8"))
print(stderr.read().decode("utf-8"))


我不确定 SFTP 规范支持多少,但许多 SFTP 服务器(包括 OpenSSH)允许从多个并行连接写入同一个文件.因此,您甚至可以在不合并文件的情况下完成 - 通过直接上传到目标文件的相应部分:


I'm not sure how much is that backed up by the SFTP specification, but many SFTP servers, including OpenSSH, allow writing to the same file from multiple connections in parallel. So you can do even without merging the files – by uploading directly to the respective parts of the target file:

import os
import threading
import paramiko

sftp_server = "example.com"
username = "username"
password = "password"

local_path = "/local/path/file.dat"
remote_path = "/remote/path/file.dat"

threads_count = 4

size = os.path.getsize(local_path)
part_size = int(size / threads_count)
lock = threading.Lock()
created = False

def upload_part(num, offset, part_size):
    print(f"Running thread {num}")
    try:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(sftp_server, port=port, username=username, password=password)
        sftp = ssh.open_sftp()
        with open(local_path, "rb") as fl:
            fl.seek(offset)
            with lock:
                global created
                m = "r+" if created else "w"
                created = True
                fr = sftp.open(remote_path, m)
            with fr:
                fr.seek(offset)
                fr.set_pipelined(True)
                size = 0
                while size < part_size:
                    s = 32768
                    if size + s > part_size:
                        s = part_size - size
                    data = fl.read(s)
                    fr.write(data)
                    size += len(data)
                    if len(data) == 0:
                        break
    except (paramiko.ssh_exception.SSHException) as x:
        print(f"Thread {num} failed: {x}")
    print(f"Thread {num} done")

print("Starting")
offset = 0
threads = []
for num in range(threads_count):
    if num == threads_count - 1:
        part_size = size - offset
    args = (num, offset, part_size)
    print(f"Starting thread {num} offset {offset} size {part_size}")
    thread = threading.Thread(target=upload_part, args=args)
    threads.append(thread)
    thread.start()
    print(f"Started thread {num}")
    offset += part_size

for num in range(len(threads)):
    print(f"Waiting for thread {num}")
    threads[num].join()

print("All thread done")

这篇关于使用多个连接/线程将大文件上传到带有 Python Paramiko 的 SFTP 服务器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆