Actorpublisher作为handleMessagesWithSinkSource中的源 [英] Actorpublisher as source in handleMessagesWithSinkSource
问题描述
我是AKKA Streams的新手。 (使用Akka v 2.4.4)
我正在尝试创建一个Websocket,该Websocket可以将新的通知推送到订阅的客户端。我的策略是实现一个ActorPublisher,稍后我可以将其发送到客户端,然后再将其推送给客户端。
I'm new to AKKA Streams. (Using Akka v 2.4.4) I am trying to create a Websocket which can push new notifications to subscribed clients. My strategy is to implement a ActorPublisher, which I later can send a message to, and then get it pushed to clients.
首先,我复制了一个ActorPublisher的示例。 :
To get started I copied an example of a ActorPublisher:
case class Tick()
class TickActor extends ActorPublisher[Int] {
import scala.concurrent.duration._
implicit val ec = context.dispatcher
val tick = context.system.scheduler.schedule(1 second, 1 second, self, `Tick())`
var cnt = 0
var buffer = Vector.empty[Int]
override def receive: Receive = {
case Tick() => {
cnt = cnt + 1
if (buffer.isEmpty && totalDemand > 0) {
onNext(cnt)
}
else {
buffer :+= cnt
if (totalDemand > 0) {
val (use,keep) = buffer.splitAt(totalDemand.toInt)
buffer = keep
use foreach onNext
}
}
}
}
override def postStop() = tick.cancel()
}
我的问题是我不知道如何使用它作为源。
My problem is that I don't know how to use it as source.
我尝试了以下操作:
val source: Source[Strict, ActorRef] = Source.actorPublisher(Props[TickActor]).map(i => TextMessage(i.toString))
optionalHeaderValueByType[akka.http.scaladsl.model.ws.UpgradeToWebSocket]() {
case Some(upgrade) =>
complete(
upgrade.handleMessagesWithSinkSource(Sink.ignore,source))
case None =>
reject(akka.http.scaladsl.server.ExpectedWebSocketRequestRejection)
}
但是,当我与客户端连接时,会收到以下ClassCastException:java.lang.ClassCastException:java.lang.Integer无法转换为scala.runtime.Nothing $
But when I connect with a client I get the following ClassCastException: java.lang.ClassCastException: java.lang.Integer cannot be cast to scala.runtime.Nothing$
如果我将Source更改为:
If I change the Source to:
val src: Source[Strict, NotUsed] = Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current.nextInt()))
.filter(i => i > 0 && i % 2 == 0).map(i => TextMessage(i.toString))
运行正常。
我在点点滴滴之间都在挣扎,所以希望您能带领我朝正确的方向前进。
I struggling a bit connecting the dots, so hopefully you can lead me in the correct direction.
推荐答案
我尝试了您的示例,能够重现该问题。我只进行了一项更改来解决此问题。这加上了type参数,现在变得有意义了,因为在akka流中的某个地方,有一个像
elem.asInstanceOf [T]
这样的代码。因此,当actorPublisher中缺少该类型时,该类型将被推断为 Nothing
I tried your example and was able to reproduce the problem. I made only one change to fix the problem. That is added the type parameter and it makes sense now because somewhere in the akka stream, there is a code like
elem.asInstanceOf[T]
. So, when the type is missing from the actorPublisher, then the type is inferred as Nothing
val source = Source.actorPublisher[Int](Props[TickActor]).map(i => TextMessage(i.toString))
这篇关于Actorpublisher作为handleMessagesWithSinkSource中的源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!