最好的方式以无反应的方式从mongodb集合中查询所有文档,而不会溢出RAM [英] Best way to query all documents from a mongodb collection in a reactive way w/out flooding RAM

查看:72
本文介绍了最好的方式以无反应的方式从mongodb集合中查询所有文档,而不会溢出RAM的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想以被动方式查询集合中的所有文档.的 collection.find()方法mongodb nodejs驱动程序返回一个游标,该游标触发集合中找到的每个文档的事件.所以我做了这个:

I want to query all the documents in a collection in a reactive way. The collection.find() method of the mongodb nodejs driver returns a cursor that fires events for each document found in the collection. So I made this:

function giant_query = (db) => {
    var req = db.collection('mycollection').find({});   
    return Rx.Observable.merge(Rx.Observable.fromEvent(req, 'data'),
                               Rx.Observable.fromEvent(req, 'end'),
                               Rx.Observable.fromEvent(req, 'close'),
                               Rx.Observable.fromEvent(req, 'readable'));
}

它将执行我想要的操作:为每个文档触发,这样我就可以以反应方式进行处理,就像这样:

It will do what I want: fire for each document, so I can treat then in a reactive way, like this:

Rx.Observable.of('').flatMap(giant_query).do(some_function).subscribe()

我可以以数十个数据包的形式查询文档,但是每次启动可观察流时,我都必须跟踪索引号,并且必须创建一个我不知道的可观察循环如果可行或正确的方法.

I could query the documents in packets of tens, but then I'd have to keep track of an index number for each time the observable stream is fired, and I'd have to make an observable loop which I do not know if it's possible or the right way to do it.

此游标的问题是我不认为它可以处理数据包中的内容.它可能会在短时间内触发所有事件,因此会淹没我的RAM.即使我使用Observable的缓冲区将某些事件缓冲在数据包中,事件和事件数据(文档)也将在RAM上等待操作.

The problem with this cursor is that I don't think it does things in packets. It'll probably fire all the events in a short period of time, therefore flooding my RAM. Even if I buffer some events in packets using Observable's buffer, the events and events data (the documents) are going to be waiting on RAM to be manipulated.

以反应性方式处理它的最好方法是什么?

What's the best way to deal with it n a reactive way?

推荐答案

我不是mongodb的专家,但是根据我所看到的示例,我会尝试这种模式.

I'm not an expert on mongodb, but based on the examples I've seen, this is a pattern I would try.

我忽略了数据以外的事件,因为节制似乎是主要问题.

I've omitted the events other than data, since throttling that one seems to be the main concern.

var cursor = db.collection('mycollection').find({});  

const cursorNext = new Rx.BehaviourSubject('next');  // signal first batch then wait
const nextBatch = () => {
  if(cursor.hasNext()) {
    cursorNext.next('next');
  }
});

cursorNext
  .switchMap(() =>                            // wait for cursorNext to signal
     Rx.Observable.fromPromise(cursor.next())  // get a single doc
       .repeat()                               // get another
       .takeWhile(() => cursor.hasNext() )     // stop taking if out of data
       .take(batchSize)                        // until full batch
       .toArray()                              // combine into a single emit
  )
  .map(docsBatch => {
    // do something with the batch
    // return docsBatch or modified doscBatch
  })
  ... // other operators?
  .subscribe(x => {
    ...
    nextBatch();
  });         

我正在尝试在不使用mongodb的情况下对此Rx流进行测试,与此同时,这可能会给您一些想法.

I'm trying to put together a test of this Rx flow without mongodb, in the meantime this might give you some ideas.

这篇关于最好的方式以无反应的方式从mongodb集合中查询所有文档,而不会溢出RAM的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆