有没有办法创建一个异步流生成器来产生重复调用函数的结果? [英] Is there any way to create a async stream generator that yields the result of repeatedly calling a function?
问题描述
我想构建一个程序来收集天气更新并将它们表示为流.我想在无限循环中调用 get_weather()
,在 finish 和 start 之间有 60 秒的延迟.
I want to build a program that collects weather updates and represents them as a stream. I want to call get_weather()
in an infinite loop, with 60 seconds delay between finish and start.
简化版本如下所示:
async fn get_weather() -> Weather { /* ... */ }
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
loop {
tokio::timer::delay_for(std::time::Duration::from_secs(60)).await;
let weather = get_weather().await;
yield weather; // This is not supported
// Note: waiting for get_weather() stops the timer and avoids overflows.
}
}
有什么方法可以轻松做到这一点吗?
Is there any way to do this easily?
当 get_weather()
花费超过 60 秒时,使用 tokio::timer::Interval
将不起作用:
Using tokio::timer::Interval
will not work when get_weather()
takes more than 60 seconds:
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60))
.then(|| get_weather())
}
如果发生这种情况,下一个功能将立即启动.我想在上一个 get_weather()
开始和下一个 get_weather()
开始之间保持 60 秒.
If that happens, the next function will start immediately. I want to keep exactly 60 seconds between the previous get_weather()
start and the next get_weather()
start.
推荐答案
使用 stream::unfold
从未来世界"到流世界".我们不需要任何额外的状态,所以我们使用空元组:
Use stream::unfold
to go from the "world of futures" to the "world of streams". We don't need any extra state, so we use the empty tuple:
use futures::StreamExt; // 0.3.4
use std::time::Duration;
use tokio::time; // 0.2.11
struct Weather;
async fn get_weather() -> Weather {
Weather
}
const BETWEEN: Duration = Duration::from_secs(1);
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
futures::stream::unfold((), |_| async {
time::delay_for(BETWEEN).await;
let weather = get_weather().await;
Some((weather, ()))
})
}
#[tokio::main]
async fn main() {
get_weather_stream()
.take(3)
.for_each(|_v| async {
println!("Got the weather");
})
.await;
}
% time ./target/debug/example
Got the weather
Got the weather
Got the weather
real 3.085 3085495us
user 0.004 3928us
sys 0.003 3151us
另见:
这篇关于有没有办法创建一个异步流生成器来产生重复调用函数的结果?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!