有没有办法创建一个异步流生成器来产生重复调用函数的结果? [英] Is there any way to create a async stream generator that yields the result of repeatedly calling a function?

查看:17
本文介绍了有没有办法创建一个异步流生成器来产生重复调用函数的结果?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想构建一个程序来收集天气更新并将它们表示为流.我想在无限循环中调用 get_weather(),在 finishstart 之间有 60 秒的延迟.

I want to build a program that collects weather updates and represents them as a stream. I want to call get_weather() in an infinite loop, with 60 seconds delay between finish and start.

简化版本如下所示:

async fn get_weather() -> Weather { /* ... */ }

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    loop {
        tokio::timer::delay_for(std::time::Duration::from_secs(60)).await;
        let weather = get_weather().await;
        yield weather; // This is not supported
        // Note: waiting for get_weather() stops the timer and avoids overflows.
    }
}

有什么方法可以轻松做到这一点吗?

Is there any way to do this easily?

get_weather() 花费超过 60 秒时,使用 tokio::timer::Interval 将不起作用:

Using tokio::timer::Interval will not work when get_weather() takes more than 60 seconds:

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60))
        .then(|| get_weather())
}

如果发生这种情况,下一个功能将立即启动.我想在上一个 get_weather() 开始和下一个 get_weather() 开始之间保持 60 秒.

If that happens, the next function will start immediately. I want to keep exactly 60 seconds between the previous get_weather() start and the next get_weather() start.

推荐答案

使用 stream::unfold 从未来世界"到流世界".我们不需要任何额外的状态,所以我们使用空元组:

Use stream::unfold to go from the "world of futures" to the "world of streams". We don't need any extra state, so we use the empty tuple:

use futures::StreamExt; // 0.3.4
use std::time::Duration;
use tokio::time; // 0.2.11

struct Weather;

async fn get_weather() -> Weather {
    Weather
}

const BETWEEN: Duration = Duration::from_secs(1);

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    futures::stream::unfold((), |_| async {
        time::delay_for(BETWEEN).await;
        let weather = get_weather().await;
        Some((weather, ()))
    })
}

#[tokio::main]
async fn main() {
    get_weather_stream()
        .take(3)
        .for_each(|_v| async {
            println!("Got the weather");
        })
        .await;
}

% time ./target/debug/example

Got the weather
Got the weather
Got the weather

real    3.085   3085495us
user    0.004   3928us
sys     0.003   3151us

另见:

这篇关于有没有办法创建一个异步流生成器来产生重复调用函数的结果?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆