播放+ ReactiveMongo:上限收藏和可尾光标 [英] Play + ReactiveMongo: capped collection and tailable cursor

查看:108
本文介绍了播放+ ReactiveMongo:上限收藏和可尾光标的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将Play Framework与Scala,Akka和ReactiveMongo一起使用.我想将MongoDB中的集合用作循环队列.几个参与者可以在其中插入文件;一旦有这些文件,一个参与者就会检索这些文件(一种发布-订阅系统). 我正在使用上限集合和可尾光标.每次检索某些文档时,我都必须运行命令EmptyCapped刷新上限的集合(无法从中删除元素),否则我总是检索同一文档.有替代解决方案吗?例如,有没有一种方法可以在不删除元素的情况下滑动光标?还是最好不要在我的情况下使用上限收集?

I'm using Play Framework with Scala, Akka and ReactiveMongo. I want to use a collection in MongoDB as a circular queue. Several actors can insert documents into it; one actor retrieves these documents as soon as they're available (a sort of publish-subscribe system). I'm using capped collections and tailable cursor. Everytime I retrieve some documents I have to run the command EmptyCapped to flush the capped collection (it's not possible to REMOVE elements from it) otherwise I retrieve always the same document. is there an alternative solution? for example is there a way to slide a cursor without removing elements? or it's better not to use capped collection in my case?

object MexDB {

def db: reactivemongo.api.DB = ReactiveMongoPlugin.db
val size: Int = 10000

// creating capped collection
val collection: JSONCollection = {

    val c = db.collection[JSONCollection]("messages")

    val isCapped = coll.convertToCapped(size, None)

    Await.ready(isCapped, Duration.Inf)

    c
}

def insert(mex: Mex) = {

    val inserted = collection.insert(mex)

    inserted onComplete {
      case Failure(e) =>
        Logger.info("Error while inserting task: " + e.getMessage())
        throw e

      case Success(i) =>
        Logger.info("Successfully inserted task")
    }

}


def find(): Enumerator[Mex] = {

  val cursor: Cursor[Mex] = collection
    .find(Json.obj())
    .options(QueryOpts().tailable.awaitData)
    .cursor[Mex]

    // meaning of maxDocs ???
    val maxDocs = 1
    cursor.enumerate(maxDocs)
}


def removeAll() = {
    db.command(new EmptyCapped("messages"))
}

}

/*** part of receiver actor code ***/

// inside preStart
val it = Iteratee.fold[Mex, List[Mex]](Nil) {
    (partialList, mex) => partialList ::: List(mex)
}

// Inside "receive" method
case Data =>

  val e: Enumerator[Mex] = MexDB.find()

  val future = e.run(it)

  future onComplete {
    case Success(list) =>
      list foreach { mex =>
        Logger.info("Mex: " + mex.id)
      }
      MexDB.removeAll()
      self ! Data

    case Failure(e) => Logger.info("Error:  "+ e.getMessage())
  }

推荐答案

在每个找到的文档为maxDocs = 1之后,可尾光标都关闭.要无限期打开它,您应该忽略此限制.

Your tailable cursor is closed after each found doc as the maxDocs = 1. To keep it open indefinitely you should omit this limit.

使用awaitData,仅当您明确关闭RM时,才会调用.onComplete.

With awaitData, .onComplete will only be called if you explicitly shutdown RM.

您需要使用光标中的某些流功能,例如.enumerate并处理每个新步骤/结果.参见 https://github.com/sgodbillon/reactivemongo-tailablecursor-demo/

You need to use some streaming function from the cursor, such as .enumerate and process each new step/result. See https://github.com/sgodbillon/reactivemongo-tailablecursor-demo/

这篇关于播放+ ReactiveMongo:上限收藏和可尾光标的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆