如何以阻塞方式有效地提取 futures::Stream 的第一个元素? [英] How can I efficiently extract the first element of a futures::Stream in a blocking manner?

查看:25
本文介绍了如何以阻塞方式有效地提取 futures::Stream 的第一个元素?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下方法:

pub fn load_names(&self, req: &super::MagicQueryType) -> ::grpcio::Result<::grpcio::ClientSStreamReceiver<String>> {

我的目标是获得 的第一个元素grpcio::ClientSStreamReceiver;我不在乎其他名字:

My goal is to get the very first element of grpcio::ClientSStreamReceiver; I don't care about the other names:

let name: String = load_names(query)?.wait().nth(0)?;

nth(0) 之前调用 wait() 似乎效率低下,因为我相信 wait() 会阻塞流,直到它收到所有元素.

It seems inefficient to call wait() before nth(0) as I believe wait() blocks the stream until it receives all the elements.

如何在不触发构建错误的情况下编写更有效的解决方案(即 nth(0).wait())?Rust 的 futures::stream::Stream 构建错误让我感到非常困惑.

How can I write a more efficient solution (i.e., nth(0).wait()) without triggering build errors? Rust's build errors for futures::stream::Stream look extremely confusing to me.

Rust 游乐场 不支持 grpcio = "0.4.4" 所以我不能提供链接.

The Rust playground doesn't support grpcio = "0.4.4" so I cannot provide a link.

推荐答案

要以阻塞方式提取 futures::Stream 的第一个元素,您应该转换 Streamcode> 通过调用 executor::block_on_stream 然后调用 <代码>迭代器::下一个.

To extract the first element of a futures::Stream in a blocking manner, you should convert the Stream to an iterator by calling executor::block_on_stream and then call Iterator::next.

use futures::{executor, stream, Stream}; // 0.3.4
use std::iter;

fn example() -> impl Stream<Item = i32> {
    stream::iter(iter::repeat(42))
}

fn main() {
    let v = executor::block_on_stream(example()).next();
    println!("{:?}", v);
}

如果您使用的是 Tokio,您可以使用 StreamExt::into_future 并使用 #[tokio::main]:

If you are using Tokio, you can convert the Stream into a Future with StreamExt::into_future and annotate a function with #[tokio::main]:

use futures::{stream, Stream, StreamExt}; // 0.3.4
use std::iter;
use tokio; // 0.2.13

fn example() -> impl Stream<Item = i32> {
    stream::iter(iter::repeat(42))
}

#[tokio::main]
async fn just_one() -> Option<i32> {
    let (i, _stream) = example().into_future().await;
    i
}

fn main() {
    println!("{:?}", just_one());
}

另见:

这篇关于如何以阻塞方式有效地提取 futures::Stream 的第一个元素?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆