有什么方法可以创建异步流生成器,从而产生重复调用函数的结果? [英] Is there any way to create a async stream generator that yields the result of repeatedly calling a function?
问题描述
我想构建一个收集天气更新并将其表示为流的程序.我想无限循环地调用get_weather()
,在完成和开始之间有60秒的延迟.
I want to build a program that collects weather updates and represents them as a stream. I want to call get_weather()
in an infinite loop, with 60 seconds delay between finish and start.
简化版本如下:
async fn get_weather() -> Weather { /* ... */ }
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
loop {
tokio::timer::delay_for(std::time::Duration::from_secs(60)).await;
let weather = get_weather().await;
yield weather; // This is not supported
// Note: waiting for get_weather() stops the timer and avoids overflows.
}
}
有什么方法可以轻松地做到这一点吗?
Is there any way to do this easily?
get_weather()
花费60秒以上时,无法使用tokio::timer::Interval
:
Using tokio::timer::Interval
will not work when get_weather()
takes more than 60 seconds:
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60))
.then(|| get_weather())
}
如果发生这种情况,下一个功能将立即启动.我想在上一个get_weather()
开始和下一个get_weather()
开始之间保持准确的60秒.
If that happens, the next function will start immediately. I want to keep exactly 60 seconds between the previous get_weather()
start and the next get_weather()
start.
推荐答案
Use stream::unfold
to go from the "world of futures" to the "world of streams". We don't need any extra state, so we use the empty tuple:
use futures::StreamExt; // 0.3.4
use std::time::Duration;
use tokio::time; // 0.2.11
struct Weather;
async fn get_weather() -> Weather {
Weather
}
const BETWEEN: Duration = Duration::from_secs(1);
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
futures::stream::unfold((), |_| async {
time::delay_for(BETWEEN).await;
let weather = get_weather().await;
Some((weather, ()))
})
}
#[tokio::main]
async fn main() {
get_weather_stream()
.take(3)
.for_each(|_v| async {
println!("Got the weather");
})
.await;
}
% time ./target/debug/example
Got the weather
Got the weather
Got the weather
real 3.085 3085495us
user 0.004 3928us
sys 0.003 3151us
另请参阅:
- How do I convert an iterator into a stream on success or an empty stream on failure?
- How do I iterate over a Vec of functions returning Futures in Rust?
- Creating a stream of values while calling async fns?
这篇关于有什么方法可以创建异步流生成器,从而产生重复调用函数的结果?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!