Actorpublisher作为handleMessagesWithSinkSource中的源 [英] Actorpublisher as source in handleMessagesWithSinkSource

查看:120
本文介绍了Actorpublisher作为handleMessagesWithSinkSource中的源的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是AKKA Streams的新手。 (使用Akka v 2.4.4)
我正在尝试创建一个Websocket,该Websocket可以将新的通知推送到订阅的客户端。我的策略是实现一个ActorPublisher,稍后我可以将其发送到客户端,然后再将其推送给客户端。

I'm new to AKKA Streams. (Using Akka v 2.4.4) I am trying to create a Websocket which can push new notifications to subscribed clients. My strategy is to implement a ActorPublisher, which I later can send a message to, and then get it pushed to clients.

首先,我复制了一个ActorPublisher的示例。 :

To get started I copied an example of a ActorPublisher:

case class Tick()

class TickActor extends ActorPublisher[Int] {
  import scala.concurrent.duration._

  implicit val ec = context.dispatcher

  val tick = context.system.scheduler.schedule(1 second, 1 second, self, `Tick())`

  var cnt = 0
  var buffer = Vector.empty[Int]

  override def receive: Receive = {
    case Tick() => {
      cnt = cnt + 1
      if (buffer.isEmpty && totalDemand > 0) {
        onNext(cnt)
      }
      else {
        buffer :+= cnt
        if (totalDemand > 0) {
          val (use,keep) = buffer.splitAt(totalDemand.toInt)
          buffer = keep
          use foreach onNext
        }
      }
    }
  }

  override def postStop() = tick.cancel()
}

我的问题是我不知道如何使用它作为源。

My problem is that I don't know how to use it as source.

我尝试了以下操作:

val source: Source[Strict, ActorRef] = Source.actorPublisher(Props[TickActor]).map(i => TextMessage(i.toString))
  optionalHeaderValueByType[akka.http.scaladsl.model.ws.UpgradeToWebSocket]() {
    case Some(upgrade) =>
      complete(
        upgrade.handleMessagesWithSinkSource(Sink.ignore,source))
    case None =>
      reject(akka.http.scaladsl.server.ExpectedWebSocketRequestRejection)
  }

但是,当我与客户端连接时,会收到以下ClassCastException:java.lang.ClassCastException:java.lang.Integer无法转换为scala.runtime.Nothing $

But when I connect with a client I get the following ClassCastException: java.lang.ClassCastException: java.lang.Integer cannot be cast to scala.runtime.Nothing$

如果我将Source更改为:

If I change the Source to:

val src: Source[Strict, NotUsed] = Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current.nextInt()))
      .filter(i => i > 0 && i % 2 == 0).map(i => TextMessage(i.toString))

运行正常。

我在点点滴滴之间都在挣扎,所以希望您能带领我朝正确的方向前进。

I struggling a bit connecting the dots, so hopefully you can lead me in the correct direction.

推荐答案

我尝试了您的示例,能够重现该问题。我只进行了一项更改来解决此问题。这加上了type参数,现在变得有意义了,因为在akka流中的某个地方,有一个像
elem.asInstanceOf [T] 这样的代码。因此,当actorPublisher中缺少该类型时,该类型将被推断为 Nothing

I tried your example and was able to reproduce the problem. I made only one change to fix the problem. That is added the type parameter and it makes sense now because somewhere in the akka stream, there is a code like elem.asInstanceOf[T]. So, when the type is missing from the actorPublisher, then the type is inferred as Nothing

val source = Source.actorPublisher[Int](Props[TickActor]).map(i => TextMessage(i.toString))

这篇关于Actorpublisher作为handleMessagesWithSinkSource中的源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆