如何将Future转换为Stream? [英] How to convert a Future into a Stream?
问题描述
我正在尝试使用async_std
从网络接收UDP数据报.
I'm trying to use async_std
to receive UDP datagrams from the network.
There is a UdpSocket
that implements async recv_from
, this method returns a future but I need a async_std::stream::Stream
that gives a stream of UDP datagrams because it is a better abstraction.
我找到了 tokio::net::UdpFramed
完全可以满足我的需要,但是在当前版本的tokio中不可用.
I've found tokio::net::UdpFramed
that does exactly what I need but it is not available in current versions of tokio.
一般来说,问题是如何将Future
从给定的异步函数转换为Stream
?
Generally speaking the question is how do I convert Future
s from a given async function into a Stream
?
推荐答案
For a single item, use FutureExt::into_stream
:
use futures::prelude::*; // 0.3.1
fn outer() -> impl Stream<Item = i32> {
inner().into_stream()
}
async fn inner() -> i32 {
42
}
For a stream from a number of futures generated by a closure, use stream::unfold
:
use futures::prelude::*; // 0.3.1
fn outer() -> impl Stream<Item = i32> {
stream::unfold((), |()| async { Some((inner().await, ())) })
}
async fn inner() -> i32 {
42
}
根据您的情况,您可以使用stream::unfold
:
use async_std::{io, net::UdpSocket}; // 1.4.0, features = ["attributes"]
use futures::prelude::*; // 0.3.1
fn read_many(s: UdpSocket) -> impl Stream<Item = io::Result<Vec<u8>>> {
stream::unfold(s, |s| {
async {
let data = read_one(&s).await;
Some((data, s))
}
})
}
async fn read_one(s: &UdpSocket) -> io::Result<Vec<u8>> {
let mut data = vec![0; 1024];
let (len, _) = s.recv_from(&mut data).await?;
data.truncate(len);
Ok(data)
}
#[async_std::main]
async fn main() -> io::Result<()> {
let s = UdpSocket::bind("0.0.0.0:9876").await?;
read_many(s)
.for_each(|d| {
async {
match d {
Ok(d) => match std::str::from_utf8(&d) {
Ok(s) => println!("{}", s),
Err(_) => println!("{:x?}", d),
},
Err(e) => eprintln!("Error: {}", e),
}
}
})
.await;
Ok(())
}
这篇关于如何将Future转换为Stream?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!