如何在另一个计算中重用队列?fs2 流 Scala [英] How can I Reuse a queue in another computation ? fs2 stream Scala
本文介绍了如何在另一个计算中重用队列?fs2 流 Scala的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我收到了 Unit
而不是 Stream[IO, String]
的错误.我正在尝试在下一个队列中重用队列的结果
I 'm getting an error of Unit
instead of Stream[IO, String]
.
I'm trying to reuse the result of a queue in the next queue
import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2.Stream
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.util.Random
class StreamTypeIntToDouble(q1: Queue[IO, Int], q2: Queue[IO, String])(
implicit timer: Timer[IO]
) {
def storeInQueueFirst: Stream[IO, Unit] = {
Stream(1, 2, 3)
.covary[IO]
.evalTap(n => IO.delay(println(s"Pushing $n to Queue First")))
.metered(Random.between(1, 20).seconds)
.through(q1.enqueue)
}
def getFromQueueFirst: Stream[IO, Unit] = {
q1.dequeue
.evalMap(n => IO.delay(println(s"Pulling from queue Second $n")))
}
def storeInQueueSecond(s: Stream[IO, Int]): Stream[IO, Unit] = {
s.map { n =>
n.toString
}
.metered(Random.between(1, 20).seconds)
.through(q2.enqueue)
}
def getFromQueueSecond: Stream[IO, Unit] = {
q2.dequeue
.evalMap(n => IO.delay(println(s"Pulling from queue second $n")))
}
}
object Five extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q1 <- Queue.bounded[IO, Int](10)
q2 <- Queue.bounded[IO, String](10)
b = new StreamTypeIntToDouble(q1, q2)
_ <- b.storeInQueueFirst.compile.drain.start
a <- b.getFromQueueFirst.compile.drain
_ <- b.storeInQueueSecond(a).compile.drain
_ <- b.getFromQueueSecond.compile.drain
} yield ()
program.as(ExitCode.Success)
}
}
推荐答案
尝试改变 getFromQueueFirst
使其产生 Stream[IO, Int]
而不是 Stream[IO, Unit]
Try to change getFromQueueFirst
so that it produces Stream[IO, Int]
rather than Stream[IO, Unit]
def getFromQueueFirst: Stream[IO, Int] = {
q1.dequeue
evalTap(n => IO.delay(println(s"Pulling from queue Second $n")))
}
然后
val program = for {
q1 <- Queue.bounded[IO, Int](10)
q2 <- Queue.bounded[IO, String](10)
b = new StreamTypeIntToDouble(q1, q2)
_ <- b.storeInQueueFirst.compile.drain.start
a <- b.getFromQueueFirst.compile.lastOrError
_ <- b.storeInQueueSecond(Stream(a)).compile.drain
_ <- b.getFromQueueSecond.compile.drain
} yield ()
编译.
这篇关于如何在另一个计算中重用队列?fs2 流 Scala的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文