为什么我的代码没有返回任何东西?斯卡拉 fs2 [英] why is my code not returning anything ? Scala fs2

查看:79
本文介绍了为什么我的代码没有返回任何东西?斯卡拉 fs2的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

该程序允许将 Mapping Ints 推送到 Double 并识别队列中的退出时间.该程序未显示任何错误,但未打印任何内容.我错过了什么?

The program permits pushing Mapping Ints to Double and identifying the exit time from the queue. The program is not showing any error but It is not printing anything. What am I missing ?

import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2._
import fs2.concurrent.Queue

import scala.concurrent.duration._
import scala.util.Random
class Tst(q1: Queue[IO, (Double, IO[Long])])(implicit timer: Timer[IO]) {

  val streamData = Stream.emit(1)
  val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData

  def storeInQueue: Stream[IO, Unit] = {
    scheduledStream
      .map { n =>
        val entryTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n.toDouble, entryTime)
      }
      .through(q1.enqueue)
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue")))

    q1.dequeue
      .evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
      .map { n =>
        val exitTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n._1, exitTime)
      }
      .evalMap(n => IO.delay(println(s"Pulling from queue $n")))
  }
}

object Five2 extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    val program = for {
      q <- Queue.bounded[IO, (Double, IO[Long])](1)
      b = new Tst(q)
      _ <- b.storeInQueue.compile.drain

    } yield ()
    program.as(ExitCode.Success)
  }
}

推荐答案

IO 是惰性求值的 - 要执行某事,它必须是创建最终 IO 值的表达式的一部分.

IO is evaluated lazily - for something to get executed it has to be a part of expression that created the final IO value.

这里:

  def storeInQueue: Stream[IO, Unit] = {
    scheduledStream ... // no side effects are run when we create this!

    q1.dequeue ... // not using scheduledStream
  }

value scheduledStream 根本没有使用,所以它不是从 storeInQueue 返回的值的一部分",所以当 IOApp 转IO 值进入计算,你的程序的配方不包含消息推送到队列的部分,所以队列总是空的.

value scheduledStream is not used at all, so it isn't "a part" of value returned from storeInQueue so when IOApp turns IO value into computations, the recipe for your program doesn't contain the part where messages are pushed to queue, so the queue is always empty.

订阅队列的部分有效,但由于没有任何东西落在队列中,它一直在等待永远不会到达的项目.

The part which subscribes to queue works, but since nothing ever lands on queue it keeps on waiting for items that will never arrive.

您必须通过使它们成为一个 IO 值的一部分"来启动两个流,例如像这样:

You would have to start both streams by "making them part of one IO value", e.g. like this:

class Tst(q1: Queue[IO, (Double, IO[Long])])(implicit timer: Timer[IO]) {

  val streamData = Stream.emit(1)
  val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData

  def storeInQueue =
    scheduledStream
      .map { n =>
        val entryTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n.toDouble, entryTime)
      }
      .through(q1.enqueue)
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue")))

  def takeFromQueue =
    q1.dequeue
      .evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
      .map { n =>
        val exitTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n._1, exitTime)
      }
      .evalMap(n => IO.delay(println(s"Pulling from queue $n")))
  }
}

object Five2 extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    val program = for {
      q <- Queue.bounded[IO, (Double, IO[Long])](1)
      b = new Tst(q)
      pushFiber <- b.storeInQueue.compile.drain.start // run as fiber
      pullFiber <- b.takeFromQueue.compile.drain.start // run as fiber
    } yield ()
    program.as(ExitCode.Success)
  }
}

这篇关于为什么我的代码没有返回任何东西?斯卡拉 fs2的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆