为什么Akka Persisence Query Read Journal不检索我的事件? [英] Why doesn't the Akka Persisence Query Read Journal retrieve my events?

查看:92
本文介绍了为什么Akka Persisence Query Read Journal不检索我的事件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在理解Akka Persistence Query时遇到问题,尤其是方法 eventsByTag ,因为它的行为不像我期望的那样。

I have problems understanding Akka Persistence Query, especially the method eventsByTag since it doesn't behave like I expect.

class我称为一个开始监听具有特定标记的持久事件的类。

In my main class I call a class that starts listening to any events being persisted with a certain tag.

class CassandraJournal(implicit val system: ActorSystem) {

 def engageStreaming = {
   val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
   implicit val mat = ActorMaterializer()

   readJournal.eventsByTag("account", Offset.noOffset)
     .runForeach { event => println(event) }
   }
}

每当我启动服务器并我的事件存储为空,并且我坚持了我的第一个事件(通过调用Akka HTTP内置的http服务),该事件确实被打印了。但是,当我重新启动服务器并且事件存储中已经有事件时,新的持久事件将不会被打印。

Whenever I start my server and my event store is empty and I persist my first event (by calling a http service, built in Akka HTTP), the event indeed gets printed. However, when I restart the server and there are already events in the event store, the new persisted events won't get printed.

对此有解释吗?我很难弄清为什么会这样。

Is there an explanation for this? I have a hard time figuring out why this is happening.

编辑

我正在使用的事件存储是Cassandra。这是PersistentActor(我不使用事件适配器来标记事件,只需将事件包装在Tagged()周围即可)

The event store I'm using is Cassandra. Here is the PersistentActor (I am not using an Event Adapter to tag the events, just wrap them around a Tagged())

class Account(id: UUID) extends PersistentActor {

  override def receiveRecover: Receive = {
    case createCheckingsAccount: CreateCheckingsAccount =>
      println("Creating checkings account")
  }

  override def receiveCommand: Receive = {
    case createCheckingsAccount: CreateCheckingsAccount =>
      persist(Tagged(CheckingsAccountCreated(id), Set("account"))) { event =>
        val checkingsAccountCreatedEvent = event.payload.asInstanceOf[CheckingsAccountCreated]
        sender ! CreateCheckingsAccountResponse(checkingsAccountCreatedEvent.id.toString)
      }

  }

  def updateState(evt: Event): Unit = {
  }

  override def persistenceId: String = s"account-$id"
}


推荐答案

如果 receiveRecover 没有进行必要的状态恢复工作,则持久性将无法正常工作。我建议在 receiveRecover 中放入一些基本的状态恢复逻辑,并让您的 updateState 方法也覆盖事件案例。

With receiveRecover not doing the necessary state recovery work, persistence wouldn't work properly. I would suggest putting some basic state recovery logic in receiveRecover and have your updateState method cover also tagged event cases.

我在状态恢复逻辑与以下类似的应用程序中使用了 eventsByTag ,它在重新启动和恢复中均能正常工作

I used eventsByTag in an app with state recovery logic similar to the following and it worked fine on both fresh start and recovery.

def updateState(e: Any): Unit = e match {
  case evt: Event =>
    state = state.updated(evt)
  case Tagged(evt: Event, _) =>
    state = state.updated(evt)
}

...

override def receiveRecover: Receive = {
  case evt: Event => updateState(evt)
  case taggedEvt: Tagged => updateState(taggedEvt)
}

这篇关于为什么Akka Persisence Query Read Journal不检索我的事件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆