我如何将future :: Stream写入磁盘而不先将其完全存储在内存中? [英] How do I write a futures::Stream to disk without storing it entirely in memory first?

查看:108
本文介绍了我如何将future :: Stream写入磁盘而不先将其完全存储在内存中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这里有一个使用Rusoto S3下载文件的示例: 如何保存用Rusoto从S3下载到我的硬盘上的文件?

There's an example of downloading a file with Rusoto S3 here: How to save a file downloaded from S3 with Rusoto to my hard drive?

问题在于它看起来像是将整个文件下载到内存中,然后将其写入磁盘,因为它使用了 StreamingBody 来实现 futures::Stream 将文件流式传输到磁盘吗?

The problem is that it looks like it's downloading the whole file into memory and then writing it to disk, because it uses the write_all method which takes an array of bytes, not a stream. How can I use the StreamingBody, which implements futures::Stream to stream the file to disk?

推荐答案

由于StreamingBody实现了Stream<Item = Vec<u8>, Error = Error>,因此我们可以构建 MCVE 表示:

Since StreamingBody implements Stream<Item = Vec<u8>, Error = Error>, we can construct a MCVE that represents that:

extern crate futures; // 0.1.25

use futures::{prelude::*, stream};

type Error = Box<std::error::Error>;

fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> {
    const DUMMY_DATA: &[&[u8]] = &[b"0123", b"4567", b"89AB", b"CDEF"];
    let iter_of_owned_bytes = DUMMY_DATA.iter().map(|&b| b.to_owned());
    stream::iter_ok(iter_of_owned_bytes)
}

然后我们可以通过某种方式获取流媒体主体",并使用 Stream::for_each 处理Stream中的每个元素.在这里,我们只调用write_all并提供一些输出位置:

We can then get a "streaming body" somehow and use Stream::for_each to process each element in the Stream. Here, we just call write_all with some provided output location:

use std::{fs::File, io::Write};

fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error> {
    streaming_body().for_each(move |chunk| file.write_all(&chunk).map_err(Into::into))
}

然后我们可以写一些测试主体:

We can then write a little testing main:

fn main() {
    let mut file = Vec::new();

    {
        let fut = save_to_disk(&mut file);
        fut.wait().expect("Could not drive future");
    }

    assert_eq!(file, b"0123456789ABCDEF");
}

有关此简单实施质量的重要说明:

Important notes about the quality of this naïve implementation:

  1. write_all的调用可能会阻塞,您不应在异步程序中执行此操作.最好将阻塞工作移交给线程池.

  1. The call to write_all may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.

Future::wait的使用会强制线程阻塞直到将来完成,这对测试很有用,但对于您的实际用例可能并不正确.

The usage of Future::wait forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.

另请参阅:

  • What is the best approach to encapsulate blocking I/O in future-rs?
  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?

这篇关于我如何将future :: Stream写入磁盘而不先将其完全存储在内存中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆