如何创建流,其中项基于流先前返回的项? [英] How can I create a stream where the items are based on items that the stream previously returned?

查看:87
本文介绍了如何创建流,其中项基于流先前返回的项?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个基于参数生成futures::Stream的函数.我想多次调用此函数并将流放在一起.麻烦的是,我想将流返回的值作为原始函数的参数反馈回去.

I have a function that generates a futures::Stream based on an argument. I want to call this function multiple times and flatten the streams together. Complicating matters is the fact that I want to feed the values returned by the stream back as the argument to the original function.

具体来说,我有一个函数可以返回数字流,直到零为止:

Concretely, I have a function that returns a stream of numbers down to zero:

fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

我想从5开始调用此函数.还应该为返回的每个奇数调用该函数.呼叫numbers_down_to_zero的总次数为:

I want to call this function starting at 5. The function should also be called for every odd value that is returned. The total set of calls to numbers_down_to_zero would be:

numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);

产生

4
3
2
1
0
2
1
0
0
0

有什么技术可以做到这一点?

What techniques exist to allow this?

推荐答案

通过(ab)使用异步/等待, genawaiter板条箱如今能够在稳定的Rust中模仿生成器语法.结合futures::pin_mut在栈上固定值,这是一种无需分配且与任意流兼容的解决方案:

By (ab)using async / await, the genawaiter crate manages to mimic generator syntax in stable Rust today. Combined with futures::pin_mut to pin value on the stack, here is a solution both allocation-free and compatible with arbitrary streams:

//# futures = "0.3"
//# genawaiter = { version = "0.2", features = ["futures03"] }
//# tokio = { version = "0.2", features = ["full"] }
use futures::{
    pin_mut,
    stream::{self, Stream, StreamExt},
};
use genawaiter::{generator_mut, stack::Co};
use std::collections::VecDeque;

async fn g(n: i32, co: Co<'_, i32>) {
    let mut seeds = VecDeque::from(vec![n]);
    while let Some(seed) = seeds.pop_back() {
        let stream = f(seed);
        pin_mut!(stream);
        while let Some(x) = stream.next().await {
            if x % 2 != 0 {
                seeds.push_front(x);
            }
            co.yield_(x).await;
        }
    }
}

fn f(n: i32) -> impl Stream<Item = i32> {
    stream::iter((0..n).rev())
}

#[tokio::main]
async fn main() {
    generator_mut!(stream, |co| g(5, co));
    stream
        .for_each(|v| async move {
            println!("v: {}", v);
        })
        .await;
}

一些缺点:

  • generator_mut宏内有一个不安全的呼叫
  • 接口有点漏水.调用者可以看到一些实现细节.
  • there is one unsafe call inside generator_mut macro
  • the interface is a bit leaky. The callers get to see some implementation details.

使用一个堆分配, genawaiter::rc::Gen 可以摆脱所有这些.但是同样,在表上进行分配还有其他选择.

With one heap allocation, genawaiter::rc::Gen can get rid of all these. But again, with allocation on the table there are other options.

use futures::{
    pin_mut,
    stream::{Stream, StreamExt},
};
use genawaiter::rc::Gen;
use std::collections::VecDeque;

fn g(n: i32) -> impl Stream<Item = i32> {
    Gen::new(|co| async move {
        let mut seeds = VecDeque::from(vec![n]);
        while let Some(seed) = seeds.pop_back() {
            let stream = f(seed);
            pin_mut!(stream);
            while let Some(x) = stream.next().await {
                if x % 2 != 0 {
                    seeds.push_front(x);
                }
                co.yield_(x).await;
            }
        }
    })
}

这篇关于如何创建流,其中项基于流先前返回的项?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆