SourceQueueWithComplete 的 Akka 流协方差 [英] Akka Streams Covarience for SourceQueueWithComplete

查看:19
本文介绍了SourceQueueWithComplete 的 Akka 流协方差的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个从 SupervisorActor 内部创建的 Actor,这个 Actor 负责将它获取的消息推送到流中.这是演员:

I have an Actor which is created from within a SupervisorActor and this Actor is responsible for pushing the messages that it gets into a stream. Here is the Actor:

class KafkaPublisher[T <: KafkaMessage: ClassTag] extends Actor {

  implicit val system = context.system
  val log = Logging(system, this.getClass.getName)

  override final def receive = {
    case ProducerStreamActivated(_, stream: SourceQueueWithComplete[T]) =>
      log.info(s"Activated stream for Kafka Producer with ActorName >> ${self.path.name} << ActorPath >> ${self.path} <<")
      context.become(active(stream))

    case other =>
      log.warning("KafkaPublisher got some unknown message while producing: " + other)
  }

  def active(stream: SourceQueueWithComplete[T]): Receive = {
    case msg: T =>
      log.info(s"Got Message >> $msg << Pushing it to stream $stream")
      stream.offer(msg)

    case other =>
      log.warning("KafkaPublisher got the unknown message while producing: " + other)
  }
}
object KafkaPublisher {

  def props[T <: KafkaMessage: ClassTag] =
    Props(new KafkaPublisher[T])
}

可以看出,这个 Actor 在通用消息类型上工作,因为 SupervisorActor 负责创建这个通用 Actor 的具体实例.

As it can be seen that this Actor works on a generic message type as the SupervisorActor is responsible for creating concrete instances of this generic Actor.

trait Event
object Event {
  case class ProducerStreamActivated[T <: KafkaMessage](kafkaTopic: String, stream: SourceQueueWithComplete[T]) extends Event
}

trait KafkaMessage
object KafkaMessage {

  case class DefaultMessage(message: String, timestamp: DateTime) extends KafkaMessage {
    def this() = this("DEFAULT-EMPTY-MESSAGE", DateTime.now(DateTimeZone.UTC))
  }

  case class DefaultMessageBundle(messages: Seq[DefaultMessage], timeStamp: DateTime) extends KafkaMessage {
    def this() = this(Seq.empty, DateTime.now(DateTimeZone.UTC))
  }
}

显然,我收到以下错误:

Obviously, I get the following error:

Error:(17, 45) pattern type is incompatible with expected type;
 found   : akka.stream.scaladsl.SourceQueueWithComplete[T]
 required: akka.stream.scaladsl.SourceQueueWithComplete[com.eon.pm.messages.KafkaMessage]
Note: T <: com.my.project.messages.KafkaMessage, but trait SourceQueueWithComplete is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
    case ProducerStreamActivated(_, stream: SourceQueueWithComplete[T]) =>

SourceQueueWithComplete 不是协变的.我怎样才能减轻这种情况?有什么建议吗?

The SourceQueueWithComplete is not Covariant. How can I mitigate this? Any suggestions?

推荐答案

like:

class KafkaPublisher extends Actor {

  implicit val system = context.system
  val log = Logging(system, this.getClass.getName)

  override final def receive = {
    case ProducerStreamActivated(_, stream) =>
      log.info(s"Activated stream for Kafka Producer with ActorName >> ${self.path.name} << ActorPath >> ${self.path} <<")
      context.become(active(stream))

    case other =>
      log.warning("KafkaPublisher got some unknown message while producing: " + other)
  }

  def active(stream: SourceQueueWithComplete[KafkaMessage]): Receive = {
    case msg: DefaultMessage =>
      log.info(s"Got Message >> $msg << Pushing it to stream $stream")
      stream.offer(msg)

    case msg: DefaultMessageBundle =>
      log.info(s"Got Message >> $msg << Pushing it to stream $stream")
      stream.offer(msg)

    case other =>
      log.warning("KafkaPublisher got the unknown message while producing: " + other)
  }
}
object KafkaPublisher {

  def props = Props(new KafkaPublisher)
}

trait Event
object Event {
  case class ProducerStreamActivated(kafkaTopic: String, stream: SourceQueueWithComplete[KafkaMessage]) extends Event
}

trait KafkaMessage
object KafkaMessage {

  case class DefaultMessage(message: String, timestamp: DateTime) extends KafkaMessage {
    def this() = this("DEFAULT-EMPTY-MESSAGE", DateTime.now(DateTimeZone.UTC))
  }

  case class DefaultMessageBundle(messages: Seq[DefaultMessage], timeStamp: DateTime) extends KafkaMessage {
    def this() = this(Seq.empty, DateTime.now(DateTimeZone.UTC))
  }
}

这篇关于SourceQueueWithComplete 的 Akka 流协方差的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆