如何从 SFTPFile 重构 readChunk 以停止使用 inlineCallbacks? [英] How refactor readChunk from SFTPFile to stop using inlineCallbacks?

查看:69
本文介绍了如何从 SFTPFile 重构 readChunk 以停止使用 inlineCallbacks?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过 ISFTPFile 读取文件,但我想避免在这种情况下使用 @inlinceCallbacks?

或者也许有更好的方法来读/写 ISFTPFile?

@defer.inlineCallbacksdef calculate_checksum(open_file):hasher = hashlib.sha256()偏移量 = 0尝试:为真:d = 产量 open_file.readChunk(offset, chunk_size)偏移量 += chunk_sizehasher.update(d)除了EOFError:经过target_checksum = hasher.hexdigest()defer.returnValue(target_checksum)client_file = client.openFile(文件名=目标,标志=FXF_READ,属性={})checksum = yield client_file.addCallback(calculate_checksum)

解决方案

您实际上希望将 sha256.update 映射到文件块的迭代器上:

hasher = hashlib.sha256()块 = read_those_chunks()地图(hasher.update,块)返回 hasher.hexdigest()

请注意,来自原始 calculate_checksums(使用 while 循环)的显式迭代现在隐藏在 map 中.基本上,map 已经取代了迭代.

障碍在于您想避免将整个文件加载到内存中的 read_those_chunks(大概).因此,作为第一步,实现该部分:

def read_those_chunks(open_file, chunk_size):偏移量 = 0为真:产量 open_file.readChunk(offset, chunk_size)偏移量 += chunk_size

有一个生成器可以生成 Deferred ,它会随后续块(或 EOFError)一起触发.不幸的是,您不能将其与 map 一起使用.所以现在实现一个可以处理这个问题的类似地图:

def async_map(function, iterable):尝试:d = 下一个(可迭代)除了停止迭代:返回d.addCallback(函数)d.addCallback(lambda 被忽略:async_map(function, iterable))返回

由于 async_map 将替换 mapmap 替换原始实现中的迭代,async_map 是仍然负责确保我们访问迭代中的每个块.然而,迭代(使用 forwhile)与 Deferred 不能很好地混合(混合它们是当你通常拉出 内联回调).所以 async_map 不会迭代.它递归 - 迭代的常见替代方案.每个递归调用都对可迭代对象的下一个元素进行操作,直到没有更多元素为止(或者直到 Deferred 失败,在这种情况下,由于 EOFError 会发生这种情况).

递归比使用 Deferred 的迭代效果更好,因为递归对函数和函数调用进行操作.Deferred 可以处理函数和函数调用 - 将函数传递给 addCallback 并且 Deferred 最终会调用该函数.迭代由函数的小部分(有时称为块"或套件")组成,Deferred 无法处理这些.您不能将块传递给 addCallback.

现在使用这两个来创建一个 Deferred,它会在计算摘要时触发:

def calculate_checksum(open_file, chunk_size):hasher = hashlib.sha256()块 = read_those_chunks(open_file, chunk_size)d = async_map(hasher.update, 块)d.addErrback(lambda err: err.trap(EOFError))d.addCallback(忽略lambda: hasher.hexdigest())返回

您可能还注意到 async_mapmap 的不同之处在于它不生成它所做的函数调用的结果列表.也许它更像是reduce:

def async_reduce(function, iterable, lhs):尝试:d = 下一个(可迭代)除了停止迭代:返回 lhsd.addCallback(lambda rhs: function(lhs, rhs))d.addCallback(lambda lhs: async_reduce(function, iterable, lhs))返回

当然,它仍然是递归而不是迭代.

计算hexdigest的归约函数如下:

def update_hash(hasher, s):hasher.update(s)返回哈希器

于是 calculate_checksum 变成:

def calculate_checksum(open_file, chunk_size):块 = read_those_chunks(open_file, chunk_size)d = async_reduce(update_hash, hashlib.sha256(), "")d.addErrback(lambda err: err.trap(EOFError))d.addCallback(lambda hasher: hasher.hexdigest())返回

没有 hasher 闭包更好一些.

当然,还有很多其他方法可以重写此函数以避免inlineCallbacks.我选择的方式并没有消除生成器函数的使用,所以如果这是你想要逃避的,它并没有真正帮助.如果是这样,也许您可​​以像我在这里所做的那样将问题分解为不同的部分,其中没有一个涉及生成器.

I'm trying to read from file over ISFTPFile and I want to avoid using @inlinceCallbacks in this scenario?

Or maybe there is a better way to read/write for ISFTPFile?

@defer.inlineCallbacks
def calculate_checksum(open_file):
    hasher = hashlib.sha256()

    offset = 0
    try:
        while True:
            d = yield open_file.readChunk(offset, chunk_size)
            offset += chunk_size
            hasher.update(d)

    except EOFError:
        pass

    target_checksum = hasher.hexdigest()
    defer.returnValue(target_checksum)


client_file = client.openFile(
    filename=target, flags=FXF_READ, attrs={})
checksum = yield client_file.addCallback(calculate_checksum)

解决方案

You effectively want to map sha256.update over an iterator of file chunks:

hasher = hashlib.sha256()
chunks = read_those_chunks()
map(hasher.update, chunks)
return hasher.hexdigest()

Note that the explicit iteration from the original calculate_checksums (using the while loop) is now hidden inside of map. Basically, map has replaced the iteration.

The obstacle is that you want to avoid a read_those_chunks which loads the whole file into memory (presumably). So, as a first step, implement that piece:

def read_those_chunks(open_file, chunk_size):
    offset = 0
    while True:
        yield open_file.readChunk(offset, chunk_size)
        offset += chunk_size

There's a generator that yields Deferreds that fire with subsequent chunks (or EOFError). Unfortunately, you can't use this with map. So now implement a map-alike that can deal with this:

def async_map(function, iterable):
    try:
        d = next(iterable)
    except StopIteration:
        return

    d.addCallback(function)
    d.addCallback(lambda ignored: async_map(function, iterable))
    return d

Since async_map is going to replace map and map replaced the iteration from the original implementation, async_map is still responsible for making sure we visit every chunk from the iterable. However, iteration (with either for or while) doesn't mix well with Deferred (mixing them is when you typically pull out inlineCallbacks). So async_map doesn't iterate. It recurses - a common alternative to iteration. Each recursive call operates on the next element of the iterable until there are no more (or until a Deferred fails, as will happen in this case due to EOFError).

Recursion works better than iteration with Deferred because recursion operates on functions and function calls. Deferred can deal with functions and function calls - pass a function to addCallback and Deferred will eventually call that function. Iteration is made up of small pieces of a function (sometimes called "blocks" or "suites") and Deferred can't deal with these. You can't pass a block to addCallback.

Now use these two to create a Deferred that fires when the digest has been computed:

def calculate_checksum(open_file, chunk_size):
    hasher = hashlib.sha256()
    chunks = read_those_chunks(open_file, chunk_size)
    d = async_map(hasher.update, chunks)
    d.addErrback(lambda err: err.trap(EOFError))
    d.addCallback(lambda ignored: hasher.hexdigest())
    return d

You may also notice that async_map differs from map in that it doesn't produce a list of results of the function calls it makes. Perhaps it's more like reduce:

def async_reduce(function, iterable, lhs):
    try:
        d = next(iterable)
    except StopIteration:
        return lhs

    d.addCallback(lambda rhs: function(lhs, rhs))
    d.addCallback(lambda lhs: async_reduce(function, iterable, lhs))
    return d

It's still recursive instead of iterative, of course.

And a reducing function for computing the hexdigest is like:

def update_hash(hasher, s):
    hasher.update(s)
    return hasher

And so calculate_checksum becomes:

def calculate_checksum(open_file, chunk_size):
    chunks = read_those_chunks(open_file, chunk_size)
    d = async_reduce(update_hash, hashlib.sha256(), "")
    d.addErrback(lambda err: err.trap(EOFError))
    d.addCallback(lambda hasher: hasher.hexdigest())
    return d

which is a bit nicer for not having the hasher closure.

Of course, there are also many other ways you could rewrite this function to avoid inlineCallbacks. The way I've chosen doesn't eliminate the use of a generator function so if that's what you wanted to escape it hasn't really helped. If so, perhaps you can decompose the problem as I have done here into different pieces, none of which involve a generator.

这篇关于如何从 SFTPFile 重构 readChunk 以停止使用 inlineCallbacks?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆