使用 Scala 和 Akka 在不丢失数据的情况下使用服务器发送的事件(SSE) [英] Consuming Server Sent Events(SSE) without losing data using scala and Akka

查看:44
本文介绍了使用 Scala 和 Akka 在不丢失数据的情况下使用服务器发送的事件(SSE)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在生产速率为 > 时消费 SSE 事件而不丢失任何数据.消费率.由于 SSE 支持背压 Akka 应该可以做到.我尝试了几种不同的方法,但额外的消息被丢弃了.

<预><代码>@单身人士类 SseConsumer @Inject()()(隐式 ec: ExecutionContext) {隐式 val 系统 = ActorSystem()val 发送:HttpRequest =>未来[HttpResponse] = foodef foo(x: HttpRequest) = {尝试 {val authHeader = Authorization(BasicHttpCredentials("user", "pass"))val newHeaders = x.withHeaders(authHeader)Http().singleRequest(newHeaders)} 抓住 {情况e:异常=>{println("Exceptio12n", e.printStackTrace())扔e}}}val eventSource2: Source[ServerSentEvent, NotUsed] =事件源(uri = Uri("https://xyz/a/events/user"),发送,initialLastEventId = Some(2"),重试延迟 = 1.second)def orderStatusEventStable() = {val 事件:Future[immutable.Seq[ServerSentEvent]] =事件源2.throttle(elements = 1, per = 3000.milliseconds, maximumBurst = 1, ThrottleMode.Shaping).take(5).runWith(Sink.seq)events.map(_.foreach(x => {//TODO: 推送到 sqs打印(456")打印(x.数据)}))}未来 {阻塞{而(真){尝试 {等待结果(orderStatusEventStable()恢复{情况e:异常=>{println("异常", e)扔e}}, 持续时间.Inf)} 抓住 {情况e:异常=>{println("异常", e.printStackTrace())}}}}}}

此代码有效,但存在以下问题:

  1. 由于 .take(5) 当消耗率<生产速度,我正在丢弃事件.
  2. 此外,我想在收到每条消息时对其进行处理,并且不想等到 5 条消息到达.我该怎么做?
  3. 我必须在 while 循环中编写使用者.这似乎不是基于事件,而是轮询(非常类似于使用分页和限制为 5 调用 GET)
  4. 我不确定节流,尝试阅读文档,但它非常令人困惑.如果我不想丢失任何事件,节流是正确的方法吗?我预计高峰时段的速率为 5000 请求/秒,否则为 10 请求/秒.当生产率很高时,我理想情况下想要应用背压.节流是正确的方法吗?根据文档,它似乎是正确的,因为它说Backpressures when下游背压或传入速率高于速度限制

解决方案

为了让 Akka Stream 回压工作,你必须只使用一个源,而不是每次都用一个新的源重新创建一种轮询.

>

忘记你的循环和你的def orderStatusEventStable.

只做这样的事情(一次):

eventSource2.operator(event =>/* 做某事 */).otherOperatorMaybe()....runWith(Sink.foreach(println))

operatorotherOperatorMaybe 是 Akka Stream 上的操作,具体取决于您要实现的目标(例如 throttletake 在您的原始代码中)).

运营商列表:https://doc.akka.io/docs/akka/current/stream/operators/index.html

Akka Stream 功能强大,但您需要花一些时间来了解它

I want to consume SSE events without losing any data when the rate of production is > rate of consumption. Since SSE supports backpressure Akka should be able to do it. I tried a few different ways but the extra messages are being dropped.


@Singleton
class SseConsumer @Inject()()(implicit ec: ExecutionContext) {

  implicit val system = ActorSystem()

  val send: HttpRequest => Future[HttpResponse] = foo

  def foo(x: HttpRequest) = {
    try {
      val authHeader = Authorization(BasicHttpCredentials("user", "pass"))
      val newHeaders = x.withHeaders(authHeader)
      Http().singleRequest(newHeaders)
    } catch {
      case e: Exception => {
        println("Exceptio12n", e.printStackTrace())
        throw e
      }
    }
  }


  val eventSource2: Source[ServerSentEvent, NotUsed] =
    EventSource(
      uri = Uri("https://xyz/a/events/user"),
      send,
      initialLastEventId = Some("2"),
      retryDelay = 1.second
    )


  def orderStatusEventStable() = {
    val events: Future[immutable.Seq[ServerSentEvent]] =
      eventSource2
        .throttle(elements = 1, per = 3000.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
        .take(5)
        .runWith(Sink.seq)
    events.map(_.foreach(x => {
      // TODO: push to sqs
      println("456")
      println(x.data)
    }))
  }


  Future {
    blocking {
      while (true) {
        try {
          Await.result(orderStatusEventStable() recover {
            case e: Exception => {
              println("exception", e)
              throw e
            }
          }, Duration.Inf)
        } catch {
          case e: Exception => {
            println("Exception", e.printStackTrace())
          }
        }
      }
    }
  }
}

This code works but with the following problems:

  1. Due to .take(5) when rate of consumption < rate of production, I am dropping events.
  2. Also I want to process each message as it comes, and don't want to wait until 5 messages have reached. How can I do that ?
  3. I have to write the consumer in a while loop. This does not seem event based, rather looks like polling (very similar to calling GET with pagination and limit of 5)
  4. I am not sure about throttling, tried reading the docs but its very confusing. If I don't want to lose any events, is throttling the right approach? I am expecting a rate of 5000 req / sec in peak hours and 10 req/sec otherwise. When the production rate is high I would I ideally want to apply backpressure. Is throttling the correct approach for that ? According to docs it seems correct as it says Backpressures when downstream backpressures or the incoming rate is higher than the speed limit

解决方案

In order for Akka Stream back pressuring to work, you have to use only one source instead of recreating a kind of polling with a new source each time.

Forget about your loop and your def orderStatusEventStable.

Only do something like this (once):

eventSource2
  .operator(event => /* do something */ )
  .otherOperatorMaybe()
  ...
  .runWith(Sink.foreach(println))

With operator and otherOperatorMaybe being operations on Akka Stream depending on what you want to achieve (like throttle and take in your original code).

List of operators: https://doc.akka.io/docs/akka/current/stream/operators/index.html

Akka Stream is powerful but you need to take some time to learn about it

这篇关于使用 Scala 和 Akka 在不丢失数据的情况下使用服务器发送的事件(SSE)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆