如何将Future转换为Stream? [英] How to convert a Future into a Stream?

查看:94
本文介绍了如何将Future转换为Stream?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用async_std从网络接收UDP数据报.

I'm trying to use async_std to receive UDP datagrams from the network.

有一个UdpSocket实现了

There is a UdpSocket that implements async recv_from, this method returns a future but I need a async_std::stream::Stream that gives a stream of UDP datagrams because it is a better abstraction.

我找到了 tokio::net::UdpFramed 完全可以满足我的需要,但是在当前版本的tokio中不可用.

I've found tokio::net::UdpFramed that does exactly what I need but it is not available in current versions of tokio.

一般来说,问题是如何将Future从给定的异步函数转换为Stream?

Generally speaking the question is how do I convert Futures from a given async function into a Stream?

推荐答案

对于单个项目,请使用

For a single item, use FutureExt::into_stream:

use futures::prelude::*; // 0.3.1

fn outer() -> impl Stream<Item = i32> {
    inner().into_stream()
}

async fn inner() -> i32 {
    42
}

对于关闭产生的大量期货的流,请使用

For a stream from a number of futures generated by a closure, use stream::unfold:

use futures::prelude::*; // 0.3.1

fn outer() -> impl Stream<Item = i32> {
    stream::unfold((), |()| async { Some((inner().await, ())) })
}

async fn inner() -> i32 {
    42
}


根据您的情况,您可以使用stream::unfold:

use async_std::{io, net::UdpSocket}; // 1.4.0, features = ["attributes"]
use futures::prelude::*; // 0.3.1

fn read_many(s: UdpSocket) -> impl Stream<Item = io::Result<Vec<u8>>> {
    stream::unfold(s, |s| {
        async {
            let data = read_one(&s).await;
            Some((data, s))
        }
    })
}

async fn read_one(s: &UdpSocket) -> io::Result<Vec<u8>> {
    let mut data = vec![0; 1024];
    let (len, _) = s.recv_from(&mut data).await?;
    data.truncate(len);
    Ok(data)
}

#[async_std::main]
async fn main() -> io::Result<()> {
    let s = UdpSocket::bind("0.0.0.0:9876").await?;

    read_many(s)
        .for_each(|d| {
            async {
                match d {
                    Ok(d) => match std::str::from_utf8(&d) {
                        Ok(s) => println!("{}", s),
                        Err(_) => println!("{:x?}", d),
                    },
                    Err(e) => eprintln!("Error: {}", e),
                }
            }
        })
        .await;

    Ok(())
}

这篇关于如何将Future转换为Stream?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆