如何仅从期货序列中读取成功值 [英] How to read only Successful values from a Seq of Futures

查看:78
本文介绍了如何仅从期货序列中读取成功值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在学习akka / scala,并尝试仅读取从 Seq [Future [Int]]后继的 Future s 却无法工作。

I am learning akka/scala and am trying to read only those Futures that succeeded from a Seq[Future[Int]] but cant get anything to work.


  1. 我模拟了10个 Future [Int] 的数组,其中一些失败取决于值 FailThreshold 的值(全部失败10,没有失败0)。

  2. 然后我尝试将它们读入ArrayBuffer(找不到用值返回不可变结构的方法)。

  3. 此外,对于成功/失败没有过滤器,因此必须在每个Future上运行 onComplete 并将更新缓冲区作为

  4. 即使 FailThreshold = 0 且Seq都将Future都设置为Success时,数组缓冲区有时还是为空

  1. I simulated an array of 10 Future[Int] some of which fail depending on the value FailThreshold takes (all fail for 10 and none fail for 0).
  2. I then try to read them into an ArrayBuffer (could not find a way to return immutable structure with the values).
  3. Also, there isn't a filter on Success/Failure so had to run an onComplete on each future and update buffer as a side-effect.
  4. Even when the FailThreshold=0 and the Seq has all Future set to Success, the array buffer is sometimes empty and different runs return array of different sizes.

我尝试了网络上的其他一些建议,例如使用 Future列表上的.sequence ,但是如果将来任何变量失败,都会抛出异常。

I tried a few other suggestions from the web like using Future.sequence on the list but this throws exception if any of future variables fail.


import akka.actor._
import akka.pattern.ask
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.util.{Timeout, Failure, Success}
import concurrent.ExecutionContext.Implicits.global

case object AskNameMessage
implicit val timeout = Timeout(5, SECONDS) 

val FailThreshold = 0

class HeyActor(num: Int) extends Actor {
    def receive = {
        case AskNameMessage => if (num<FailThreshold) {Thread.sleep(1000);sender ! num} else sender ! num
    }
}

class FLPActor extends Actor {
    def receive = {
        case t: IndexedSeq[Future[Int]] => {
            println(t)
            val b = scala.collection.mutable.ArrayBuffer.empty[Int]
            t.foldLeft( b ){ case (bf,ft) => 
                ft.onComplete { case Success(v) => bf += ft.value.get.get }
                bf
            }
            println(b)

        }
    }
}
val system = ActorSystem("AskTest")
val flm = (0 to 10).map( (n) => system.actorOf(Props(new HeyActor(n)), name="futureListMake"+(n)) )
val flp = system.actorOf(Props(new FLPActor), name="futureListProcessor")

// val delay = akka.pattern.after(500 millis, using=system.scheduler)(Future.failed( throw new IllegalArgumentException("DONE!") ))
val delay = akka.pattern.after(500 millis, using=system.scheduler)(Future.successful(0))
val seqOfFtrs = (0 to 10).map( (n) => Future.firstCompletedOf( Seq(delay, flm(n) ? AskNameMessage) ).mapTo[Int] )
flp ! seqOfFtrs

接收 FLPActor 主要获得

Vector(Future(Success(0)), Future(Success(1)), Future(Success(2)), Future(Success(3)), Future(Success(4)), Future(Success(5)), Future(Success(6)), Future(Success(7)), Future(Success(8)), Future(Success(9)), Future(Success(10)))

,但数组缓冲区 b 具有不同数量的值,有时为空。

but the array buffer b has varying number of values and empty at times.

有人可以在这里指出我的空白吗?

Can someone please point me to gaps here,


  • 为什么即使所有Future都已成功解决,数组缓冲区的大小也会变化,

  • 当我们想通过TimeOut 询问不同的参与者并仅使用那些成功返回的请求时,正确使用的模式是什么

  • why would the array buffer have varying sizes even when all Future have resolved to Success,
  • what is the correct pattern to use when we want to ask different actors with TimeOut and use only those asks that have successfully returned for further processing.

推荐答案

您可以直接发送IndexedSeq [Future [Int]]而不是应该转变为Fut ure [IndexedSeq [Int]],然后将其通过管道传递到下一个参与者。您不会将期货直接发送给演员。

Instead of directly sending the IndexedSeq[Future[Int]], you should transform to Future[IndexedSeq[Int]] and then pipe it to the next actor. You don't send the Futures directly to an actor. You have to pipe it.

HeyActor可以保持不变。

HeyActor can stay unchanged.

之后

val seqOfFtrs = (0 to 10).map( (n) => Future.firstCompletedOf( Seq(delay, flm(n) ? AskNameMessage) ).mapTo[Int] )

进行恢复,并使用Future.sequence将其转换为一个Future:

do a recover, and use Future.sequence to turn it into one Future:

val oneFut = Future.sequence(seqOfFtrs.map(f=>f.map(Some(_)).recover{ case (ex: Throwable) => None})).map(_.flatten)

如果您对Some,None和Flatten不了解业务,请确保您了解Option类型。从序列中删除值的一种方法是将序列中的值映射到Option(某些或无),然后展平序列。

If you don't understand the business with Some, None, and flatten, then make sure you understand the Option type. One way to remove values from a sequence is to map values in the sequence to Option (either Some or None) and then to flatten the sequence. The None values are removed and the Some values are unwrapped.

将数据转换为单个Future之后,将其传递到FLPActor:

After you have transformed your data into a single Future, pipe it over to FLPActor:

oneFut pipeTo flp

FLPActor应该使用以下接收函数重写:

FLPActor should be rewritten with the following receive function:

def receive = {
  case printme: IndexedSeq[Int] => println(printme)
}

在Akka中,修改您的主线程中的某些状态Future的演员或Future的onComplete是一个很大的禁忌。在最坏的情况下,这会导致比赛条件。请记住,每个Future在其自己的线程上运行,因此在actor内部运行Future意味着您需要在不同的线程中同时进行工作。在演员也在处理某些状态的同时,让Future直接修改您的演员中的某些状态是灾难的根源。在Akka中,您可以将所有更改直接在主要执行者的执行主线程中处理为状态。如果将来有一些工作要做,并且需要从角色的主线程访问该工作,则可以将其通过管道传递给该角色。 pipeTo模式对于访问Future的完成计算是有效,正确且安全的。

In Akka, modifying some state in the main thread of your actor from a Future or the onComplete of a Future is a big no-no. In the worst case, it results in race conditions. Remember that each Future runs on its own thread, so running a Future inside an actor means you have concurrent work being done in different threads. Having the Future directly modify some state in your actor while the actor is also processing some state is a recipe for disaster. In Akka, you process all changes to state directly in the primary thread of execution of the main actor. If you have some work done in a Future and need to access that work from the main thread of an actor, you pipe it to that actor. The pipeTo pattern is functional, correct, and safe for accessing the finished computation of a Future.

为回答有关FLPActor为什么未正确打印IndexedSeq的问题:在期货完成之前打印出ArrayBuffer。在这种情况下,onComplete不是正确的惯用语,因此应避免使用它,因为它不是很好的功能样式。

To answer your question about why FLPActor is not printing out the IndexedSeq correctly: you are printing out the ArrayBuffer before your Futures have been completed. onComplete isn't the right idiom to use in this case, and you should avoid it in general as it isn't good functional style.

不要忘记导入akka.pattern.pipe用于pipeTo语法。

Don't forget the import akka.pattern.pipe for the pipeTo syntax.

这篇关于如何仅从期货序列中读取成功值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆