将AsyncRead转换为TryStream字节的最佳方法是什么? [英] What is the best way to convert an AsyncRead to a TryStream of bytes?

查看:197
本文介绍了将AsyncRead转换为TryStream字节的最佳方法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个AsyncRead,想将其转换为带有tokio 0.2和期货0.3的Stream<Item = tokio::io::Result<Bytes>>.

I have an AsyncRead and want to convert it to a Stream<Item = tokio::io::Result<Bytes>> with tokio 0.2 and futures 0.3.

我能做的最好的事情是这样的:

The best I've been able to do is something like:

use bytes::Bytes; // 0.4.12
use futures::stream::{Stream, TryStreamExt};; // 0.3.1
use tokio::{fs::File, io::Result}; // 0.2.4
use tokio_util::{BytesCodec, FramedRead}; // 0.2.0

#[tokio::main]
async fn main() -> Result<()> {
    let file = File::open("some_file.txt").await?;
    let stream = FramedRead::new(file, BytesCodec::new()).map_ok(|b| b.freeze());
    fn_that_takes_stream(stream)
}

fn fn_that_takes_stream<S, O>(s: S) -> Result<()>
where
    S: Stream<Item = Result<Bytes>>,
{
    //...
    Ok(())
}

似乎应该有一个更简单的方法;我很惊讶Tokio没有包含一个编解码器来获取Bytes而不是BytesMut的流,或者仅仅是扩展特性提供了一种将AsyncRead转换为Stream的方法.我想念什么吗?

It seems like there should be a simpler way; I'm surprised Tokio doesn't include a codec to get a stream of Bytes instead of BytesMut or that there isn't just an extension trait that provides a method to convert an AsyncRead into a Stream. Am I missing something?

推荐答案

关于futures-0.3板条箱中定义的AsyncReadstream::*,有

Regarding AsyncRead and stream::* as defined in futures-0.3 crate, there is

fn stream::TryStreamExt::into_async_read(self) -> IntoAsyncRead<Self>

,但并非相反.这种差异很烦人,希望可以在async/await生态系统的重要包装箱达到1.0之前解决.目前,我已经看到了几种自行完成的方法:

but not the other way around. This discrepancy is annoying and hopefully can be addressed before important crates of async/await ecosystem hit 1.0. For now, I've seen several ways to do it oneself:

    托管AsyncRead

实现Stream特征

  • 使用futures实用程序功能,例如future::poll_fn

    make use of futures utility functions such as future::poll_fn

    OP的方法

    IMO第三个是最直接的.这是一些工作代码:

    IMO the third is the most straightforward. Here is some working code:

    //# bytes = "0.5.3"
    //# futures = "0.3.1"
    //# tokio = { version = "0.2.4", features = ["full"] }
    //# tokio-util = { version = "0.2.0", features = ["codec"] }
    use bytes::Bytes;
    use futures::stream::{self, Stream, StreamExt, TryStreamExt};
    use tokio::io::{AsyncRead, Result};
    use tokio_util::codec;
    
    fn into_byte_stream<R>(r: R) -> impl Stream<Item=Result<u8>>
    where
        R: AsyncRead,
    {
        codec::FramedRead::new(r, codec::BytesCodec::new())
            .map_ok(|bytes| stream::iter(bytes).map(Ok))
            .try_flatten()
    }
    
    fn into_bytes_stream<R>(r: R) -> impl Stream<Item=Result<Bytes>>
    where
        R: AsyncRead,
    {
        codec::FramedRead::new(r, codec::BytesCodec::new())
            .map_ok(|bytes| bytes.freeze())
    }
    
    #[tokio::main]
    async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
        let reader = std::io::Cursor::new([114, 117, 115, 116]);
        let res = into_byte_stream(reader)
            .try_collect::<Vec<_>>()
            .await?;
        dbg!(res);
    
        let reader = std::io::Cursor::new([114, 117, 115, 116]);
        let res = into_bytes_stream(reader)
            .try_collect::<Vec<_>>()
            .await?;
        dbg!(res);
    
        Ok(())
    }
    

    (OP要求提供TryStream.但是futures-0.3具有impl<S, T, E> TryStream for S where S: Stream<Item = Result<T, E>> + ?Sized,我们免费获得.)

    (OP asked for TryStream. But futures-0.3 has impl<S, T, E> TryStream for S where S: Stream<Item = Result<T, E>> + ?Sized, we get it for free.)

    我提交了一张futures-rs项目的票证,要求为什么.事实证明,这比我最初想象的要复杂得多. TL; DR的意思是,只有在交付通用关联类型(GAT)之后(希望在明年),我们才能令人满意地解决此问题. Niko的异步访谈#2 对此进行了相当深入的讨论.

    I filed a ticket for futures-rs project to ask why. Turns out it is much more complicated than I initially thought. TL;DR is that only after generic associated types (GATs) is shipped, which hopefully will be next year, we can satisfyingly address this problem. Niko's async interview #2 goes into considerable depth to discuss this.

    这篇关于将AsyncRead转换为TryStream字节的最佳方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    相关文章
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆