带有StateT [IO,_,_]的FS2流,定期转储状态 [英] FS2 Stream with StateT[IO, _, _], periodically dumping state

查看:137
本文介绍了带有StateT [IO,_,_]的FS2流,定期转储状态的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个程序,它消耗无限的数据流.在此过程中,我想记录一些指标,由于它们只是简单的总和和平均值,因此形成了一个monoid.我想定期地在某个地方写出这些指标,清除它们,然后返回累积它们.我基本上是:

I have a program which consumes an infinite stream of data. Along the way I'd like to record some metrics, which form a monoid since they're just simple sums and averages. Periodically, I want to write out these metrics somewhere, clear them, and return to accumulating them. I have essentially:

object Foo {
  type MetricsIO[A] = StateT[IO, MetricData, A]

  def recordMetric(m: MetricData): MetricsIO[Unit] = {
    StateT.modify(_.combine(m))
  }

  def sendMetrics: MetricsIO[Unit] = {
    StateT.modifyF { s =>
      val write: IO[Unit] = writeMetrics(s)
      write.attempt.map {
        case Left(_) => s
        case Right(_) => Monoid[MetricData].empty
      }
    }
  }
}

因此大多数执行直接使用IO并使用StateT.liftF进行提升.在某些情况下,我包括对recordMetric的一些调用.最后,我得到了一个流:

So most of the execution uses IO directly and lifts using StateT.liftF. And in certain situations, I include some calls to recordMetric. At the end of it I've got a stream:

val mainStream: Stream[MetricsIO, Bar] = ...

我想定期(每分钟左右)转储指标,所以我尝试了:

And I want to periodically, say every minute or so, dump the metrics, so I tried:

val scheduler: Scheduler = ...
val sendStream =
  scheduler
    .awakeEvery[MetricsIO](FiniteDuration(1, TimeUnit.Minutes))
    .evalMap(_ => Foo.sendMetrics)

val result = mainStream.concurrently(sendStream).compile.drain

然后我执行通常的顶层程序,即以开始状态调用run,然后调用unsafeRunSync.

And then I do the usual top level program stuff of calling run with the start state and then calling unsafeRunSync.

问题是,我只能看到空的指标!我怀疑这与我的monoid隐式地为sendStream提供空指标有关,但我无法完全弄清楚为什么应该这样做或如何解决.也许有一种方法可以代替我将这些sendMetrics调用插入"主流中?

The issue is, I only ever see empty metrics! I suspect it's something to with my monoid implicitly providing empty metrics to sendStream but I can't quite figure out why that should be or how to fix it. Maybe there's a way I can "interleave" these sendMetrics calls into the main stream instead?

这是一个最小的完整可运行示例:

import fs2._
import cats.implicits._
import cats.data._
import cats.effect._
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

val sec = Executors.newScheduledThreadPool(4)
implicit val ec = ExecutionContext.fromExecutorService(sec)

type F[A] = StateT[IO, List[String], A]

val slowInts = Stream.unfoldEval[F, Int, Int](1) { n =>
  StateT(state => IO {
    Thread.sleep(500)
    val message = s"hello $n"
    val newState = message :: state
    val result = Some((n, n + 1))
    (newState, result)
  })
}

val ticks = Scheduler.fromScheduledExecutorService(sec).fixedDelay[F](FiniteDuration(1, SECONDS))

val slowIntsPeriodicallyClearedState = slowInts.either(ticks).evalMap[Int] {
  case Left(n) => StateT.liftF(IO(n))
  case Right(_) => StateT(state => IO {
    println(state)
    (List.empty, -1)
  })
}

现在,如果我这样做:

slowInts.take(10).compile.drain.run(List.empty).unsafeRunSync

然后我得到了预期的结果-状态正确地累加到输出中.但是,如果我这样做:

Then I get the expected result - the state properly accumulates into the output. But if I do:

slowIntsPeriodicallyClearedState.take(10).compile.drain.run(List.empty).unsafeRunSync

然后我看到一个一致打印出来的空白列表.我希望可以打印出部分列表(大约2个元素).

Then I see an empty list consistently printed out. I would have expected partial lists (approx. 2 elements) printed out.

推荐答案

StateT与效果类型一起使用并不安全,因为面对并发访问它并不安全.相反,请考虑使用Ref(根据fs2或cats-effect,取决于版本).

StateT is not safe to use with effect types, because it's not safe in the face of concurrent access. Instead, consider using a Ref (from either fs2 or cats-effect, depending what version).

类似这样的东西:

def slowInts(ref: Ref[IO, Int]) = Stream.unfoldEval[F, Int, Int](1) { n =>
  val message = s"hello $n"
  ref.modify(message :: _) *> IO {
    Thread.sleep(500)
    val result = Some((n, n + 1))
    result
  }
}

val ticks = Scheduler.fromScheduledExecutorService(sec).fixedDelay[IO](FiniteDuration(1, SECONDS))

def slowIntsPeriodicallyClearedState(ref: Ref[IO, Int] = 
  slowInts.either(ticks).evalMap[Int] {
    case Left(n) => IO.pure(n)
    case Right(_) =>
      ref.modify(_ => Nil).flatMap { case Change(previous, now) => 
        IO(println(now)).as(-1)
      }
  }

这篇关于带有StateT [IO,_,_]的FS2流,定期转储状态的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆