使用返回Future的函数映射Stream [英] mapping a Stream with a function returning a Future

查看:79
本文介绍了使用返回Future的函数映射Stream的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有时我会遇到自己想要合并为Future[Stream[Y]]Stream[X]function X => Future Y的情况,但似乎找不到解决方法.例如,我有

I sometimes find myself in a situation where I have some Stream[X], and a function X => Future Y, that I'd like to combine to a Future[Stream[Y]], and I can't seem to find a way to do it. For example, I have

val x = (1 until 10).toStream
def toFutureString(value : Integer) = Future(value toString)

val result : Future[Stream[String]] = ???

我尝试了

 val result = Future.Traverse(x, toFutureString)

给出正确的结果,但似乎在返回Future之前消耗了整个流,这或多或少地击败了钱包

which gives the correct result, but seems to consume the entire stream before returning the Future, which more or less defeats the purpse

我尝试过

val result = x.flatMap(toFutureString)

但是不能用type mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?]

val result = x.map(toFutureString)

返回有些奇怪且无用的Stream[Future[String]]

returns the somewhat odd and useless Stream[Future[String]]

在这里我应该怎么做才能解决问题?

What should I do here to get things fixed?

我不会卡在Stream上,只要在开始处理头部之前不会阻塞评估所有项目,我对Iterator上的相同操作也会同样满意.

I'm not stuck on a Stream, I'd be equally happy with the same operation on an Iterator, as long as it won't block on evaluating all items before starting to process the head

Edit2:我不确定100%确定Future.Traverse构造是否需要在返回Future [Stream]之前遍历整个流,但我认为确实如此.如果不是这样,那么这本身就是一个很好的答案.

I'm not 100% sure that the Future.Traverse construct needs to traverse the entire stream before returning a Future[Stream], but I think it does. If it doesn't, that's a fine answer in itself.

Edit3:我也不需要按顺序排列结果,我可以按返回的流或迭代器顺序排列.

I also don't need the result to be in order, I'm fine with the stream or iterator returned being whatever order.

推荐答案

使用traverse的方向正确,但不幸的是,在这种情况下,标准库的定义似乎有些破损,不应返回之前需要消耗流.

You're on the right track with traverse, but unfortunately it looks like the standard library's definition is a little broken in this case—it shouldn't need to consume the stream before returning.

Future.traverse是更为通用的功能的特定版本,该功能可用于以可遍历"类型包装的任何应用函子(请参见论文或我的答案此处).

Future.traverse is a specific version of a much more general function that works on any applicative functor wrapped in a "traversable" type (see these papers or my answer here for more information, for example).

Scalaz 库提供了这个更通用的版本,在这种情况下,它可以按预期工作(请注意我正在从 scalaz-contrib 获取Future的应用仿函数实例;不是但是在Scalaz的稳定版本中,该版本仍与Scala 2.9.2交叉构建,而Scala 2.9.2没有此Future):

The Scalaz library provides this more general version, and it works as expected in this case (note that I'm getting the applicative functor instance for Future from scalaz-contrib; it's not yet in the stable versions of Scalaz, which are still cross-built against Scala 2.9.2, which doesn't have this Future):

import scala.concurrent._
import scalaz._, Scalaz._, scalaz.contrib.std._

import ExecutionContext.Implicits.global

def toFutureString(value: Int) = Future(value.toString)

val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString

这会立即在无限流上返回,因此我们可以确定它并没有首先消耗.

This returns immediately on an infinite stream, so we know for sure that it's not being consuming first.

作为脚注:如果您查看

As a footnote: If you look at the source for Future.traverse you'll see that it's implemented in terms of foldLeft, which is convenient, but not necessary or appropriate in the case of streams.

这篇关于使用返回Future的函数映射Stream的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆