如何将迭代器成功转换为流或将失败转换为空流? [英] How do I convert an iterator into a stream on success or an empty stream on failure?

查看:97
本文介绍了如何将迭代器成功转换为流或将失败转换为空流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用一个常规迭代器并打开放入流,这样我就可以进行进一步的流处理.问题是我可能要处理迭代器或错误.我想我与此非常接近:

I'd like to take a regular iterator and turn it into a stream so that I can do further stream processing. The trouble is that I may have an iterator or an error to deal with. I think I'm pretty close with this:

#[macro_use]
extern crate log;
extern crate futures; // 0.1.21
extern crate tokio;

use futures::prelude::*;
use futures::{future, stream};
use std::fmt::Debug;
use std::net::{SocketAddr, ToSocketAddrs};

fn resolve(addrs: impl ToSocketAddrs + Debug) -> impl Stream<Item = SocketAddr, Error = ()> {
    match addrs.to_socket_addrs() {
        Ok(iter) => stream::unfold(iter, |iter| match iter.next() {
            Some(a) => Some(future::ok((a, iter))),
            None => None,
        }),
        Err(e) => {
            error!("could not resolve socket addresses {:?}: {:?}", addrs, e);
            stream::empty()
        }
    }
}

fn main() {
    let task = resolve("1.2.3.4:12345")
        .map_err(|e| error!("{:?}", e))
        .for_each(|addr| info!("{:?}", addr))
        .fold();
    tokio::run(task);
}

游乐场

error[E0308]: match arms have incompatible types
  --> src/main.rs:12:5
   |
12 | /     match addrs.to_socket_addrs() {
13 | |         Ok(iter) => stream::unfold(iter, |iter| match iter.next() {
14 | |             Some(a) => Some(future::ok((a, iter))),
15 | |             None => None,
...  |
20 | |         }
21 | |     }
   | |_____^ expected struct `futures::stream::Unfold`, found struct `futures::stream::Empty`
   |
   = note: expected type `futures::stream::Unfold<<impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter, [closure@src/main.rs:13:42: 16:10], futures::FutureResult<(std::net::SocketAddr, <impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter), _>>`
              found type `futures::stream::Empty<_, _>`
note: match arm with an incompatible type
  --> src/main.rs:17:19
   |
17 |           Err(e) => {
   |  ___________________^
18 | |             error!("could not resolve socket addresses {:?}: {:?}", addrs, e);
19 | |             stream::empty()
20 | |         }
   | |_________^

error[E0277]: the trait bound `(): futures::Future` is not satisfied
  --> src/main.rs:27:10
   |
27 |         .for_each(|addr| info!("{:?}", addr))
   |          ^^^^^^^^ the trait `futures::Future` is not implemented for `()`
   |
   = note: required because of the requirements on the impl of `futures::IntoFuture` for `()`

error[E0599]: no method named `fold` found for type `futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()>` in the current scope
  --> src/main.rs:28:10
   |
28 |         .fold();
   |          ^^^^
   |
   = note: the method `fold` exists but the following trait bounds were not satisfied:
           `&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : futures::Stream`
           `&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : std::iter::Iterator`

提示非常明显.我从match返回的两个Result不同,并且应该相同.现在,我该怎么做才能返回流?

The hint is pretty obvious. The two Results I'm returning from the match differ and should be the same. Now, how can I do that so that I return a stream?

推荐答案

Rust是一种静态类型的语言,这意味着函数的返回类型必须是在编译时已知的单一类型.您正在尝试返回多个类型,这些类型是在运行时决定的.

Rust is a statically typed language which means that the return type of a function has to be a single type, known at compile time. You are attempting to return multiple types, decided at runtime.

最接近原始版本的解决方案是始终返回Unfold流:

The closest solution to your original is to always return the Unfold stream:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    stream::unfold(addrs.to_socket_addrs(), |r| {
        match r {
            Ok(mut iter) => iter.next().map(|addr| future::ok((addr, Ok(iter)))),
            Err(_) => None,
        }
    })
}


但是为什么要重新发明轮子呢?


But why reinvent the wheel?

futures::stream::iter_ok

Iterator转换为Stream,始终准备产生下一个值.

Converts an Iterator into a Stream which is always ready to yield the next value.

期货板条箱的后续版本为Either实现了Stream,这使其非常美观:

Subsequent versions of the futures crate implement Stream for Either, which makes this very elegant:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    match addrs.to_socket_addrs() {
        Ok(iter) => stream::iter_ok(iter).left_stream(),
        Err(_) => stream::empty().right_stream(),
    }
}

将此功能回溯到期货0.1很简单(也许有人应该将其作为PR提交给那些坚持0.1的人...):

It's straightforward to backport this functionality to futures 0.1 (maybe someone should submit it as a PR for those who are stuck on 0.1...):

enum MyEither<L, R> {
    Left(L),
    Right(R),
}

impl<L, R> Stream for MyEither<L, R>
where
    L: Stream,
    R: Stream<Item = L::Item, Error = L::Error>,
{
    type Item = L::Item;
    type Error = L::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self {
            MyEither::Left(l) => l.poll(),
            MyEither::Right(r) => r.poll(),
        }
    }
}

trait EitherStreamExt {
    fn left_stream<R>(self) -> MyEither<Self, R>
    where
        Self: Sized;
    fn right_stream<L>(self) -> MyEither<L, Self>
    where
        Self: Sized;
}

impl<S: Stream> EitherStreamExt for S {
    fn left_stream<R>(self) -> MyEither<Self, R> {
        MyEither::Left(self)
    }
    fn right_stream<L>(self) -> MyEither<L, Self> {
        MyEither::Right(self)
    }
}


更好的是,使用Result是迭代器并且Stream::flatten存在的事实:


Even better, use the fact that Result is an iterator and Stream::flatten exists:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    stream::iter_ok(addrs.to_socket_addrs())
        .map(stream::iter_ok)
        .flatten()
}

或者如果您真的要打印错误:

Or if you really want to print errors:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    stream::once(addrs.to_socket_addrs())
        .map(stream::iter_ok)
        .map_err(|e| eprintln!("err: {}", e))
        .flatten()
}

另请参阅:

  • Conditionally return empty iterator from flat_map
  • Conditionally iterate over one of several possible iterators
  • What is the correct way to return an Iterator (or any other trait)?

这篇关于如何将迭代器成功转换为流或将失败转换为空流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆