fs2相关内容

我如何“拆分"?fs2 中的流?

我想做这样的事情: def splitStream[F, A](stream: fs2.Stream[F, A], split: A => B): (Stream[F, A], Stream[F, B)) =(流,stream.map(拆分) 但这不起作用,因为它从源“拉"了两次 - 当我同时排出 stream 和 stream.map(split) 时,每次一次.我如何防止这种情况?以某种方 ..
发布时间:2021-07-15 21:08:25 其他开发

为什么我的代码没有返回任何东西?斯卡拉 fs2

该程序允许将 Mapping Ints 推送到 Double 并识别队列中的退出时间.该程序未显示任何错误,但未打印任何内容.我错过了什么? import cat.effect.{ExitCode, IO, IOApp, Timer}导入 fs2._导入 fs2.concurrent.Queue导入 scala.concurrent.duration._导入 scala.util.Randomc ..
发布时间:2021-06-30 19:33:08 其他开发

模拟返回 fs2.Stream 的方法

为什么不能用 Mockito 模拟来模拟返回 fs2.Stream 的方法? Mockito 抱怨我试图返回 FreeC 而不是 Stream.为什么会这样,我怎样才能让它工作? 以下代码: 导入cats.effect.IO导入 fs2.Stream导入 org.mockito.Mockito.when导入 org.scalatest.FlatSpec导入 org.scalatest ..
发布时间:2021-06-03 18:47:38 其他开发

带有StateT [IO,_,_]的FS2流,定期转储状态

我有一个程序,它消耗无限的数据流.在此过程中,我想记录一些指标,由于它们只是简单的总和和平均值,因此形成了一个monoid.我想定期地在某个地方写出这些指标,清除它们,然后返回累积它们.我基本上是: object Foo { type MetricsIO[A] = StateT[IO, MetricData, A] def recordMetric(m: MetricData): ..
发布时间:2020-07-19 19:32:35 其他开发