异步子任务的异步光标迭代 [英] Async Cursor Iteration with Asynchronous Sub-task

查看:144
本文介绍了异步子任务的异步光标迭代的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想对没有数字键的mongoDB集合执行迭代( _id )。该集合只有随机字符串作为_id,并且集合的大小很大,因此使用 .toArray()在RAM上加载整个文档不是一个可行的选项。另外,我想对每个元素执行异步任务。使用 .map() .each() .forEach()由于任务的异步性质而受到限制。我尝试使用那些提到的方法运行任务,但它确实与异步任务冲突,返回等待的承诺而不是正确的结果。

I want to perform an iteration over a mongoDB collection w/o numeric key(_id). The collection only has random strings as an _id, and the size of the collection is massive, thus loading up the whole documents on RAM using .toArray() is not a viable option. plus I want to perform asynchronous task on each element. the usage of .map() or .each(), .forEach() is limited because of the asynchronous nature of the task. I tried to run the task with those mentioned methods but it did of course conflicted with asynchronous task, returned pending promises instead of proper results.

示例

async function dbanalyze(){

  let cursor = db.collection('randomcollection').find()
  for(;;){
    const el = cursor.hasNext() ? loaded.next() : null;
    if(!cursor) break
    await performAnalyze(cursor) // <---- this doesn't return a document but just a cursor object
  }

}

如何仅使用 for()来迭代mongoDB集合

推荐答案

Cursor.hasNext() 方法也是异步 ,所以你需要等待 Cursor.next也是如此() 。因此,实际的循环用法确实应该是,而

async function dbanalyze(){

  let cursor = db.collection('randomcollection').find()
  while ( await cursor.hasNext() ) {  // will return false when there are no more results
    let doc = await cursor.next();    // actually gets the document
    // do something, possibly async with the current document
  }

}

如评论中所述,最终 Cursor.hasNext() 当光标返回 false 实际上已经耗尽了, Cursor.next() 实际上是从游标中检索每个值的东西。当 hasNext() false break 循环c $ c>,但它更自然地适用于

As noted in the comments, eventually Cursor.hasNext() will return false when the cursor is actually depleted, and the Cursor.next() is the thing that is actually retrieving each value from the cursor. You could do other structures and break the loop when hasNext() is false, but it more naturally lends itself to a while.

这些仍然是异步,所以你需要等待每个人的承诺解决方案,这是你错过的主要事实。

These are still "async", so you need to await the promise resolution on each, and that was the main fact you were missing.

As for Cursor.map() ,那么你可能也错过了在提供的函数上用 async 标记的标记:

As for Cursor.map(), then you are probably missing the point that it can be marked with an async flag on the provided function as well:

 cursor.map( async doc => {                   // We can mark as async
    let newDoc = await someAsyncMethod(doc);  // so you can then await inside
    return newDoc;
 })

但你实际上仍然希望在某处迭代,除非你可以使用 .pipe()到其他输出目的地。

But you still actually want to "iterate" that somewhere, unless you can get away with using .pipe() to some other output destination.

此外, async / await 标志也会生成 Cursor.forEach() 更实用,因为它的一个常见缺陷是无法简单地处理内部异步调用,但是使用这些标志,您现在可以轻松地这样做,但不可否认,因为您必须使用回调,您可能希望将其包装在Promise中:

Also the async/await flags also make Cursor.forEach() "more practical again", as it's one common flaw was not being able to simply handle an "inner" asynchronous call, but with these flags you can now do so with ease, though admittedly since you must use a callback, you probably want to wrap this in a Promise :

await new Promise((resolve, reject) => 
  cursor.forEach(
    async doc => {                              // marked as async
      let newDoc = await someAsyncMethod(doc);  // so you can then await inside
      // do other things
    },
    err => {
      // await was respected, so we get here when done.
      if (err) reject(err);
      resolve();
    }
  )
);

当然,总是有办法用回调或普通的Promise实现来应用它,但它是 async / await 的糖实际上使它看起来更干净。

Of course there has always been ways to apply this with either callbacks or plain Promise implementations, but it's the "sugar" of async/await than actually makes this look much cleaner.

最喜欢的版本使用 AsyncIterator ,现在已在NodeJS v10及更高版本中启用。这是一种更清晰的迭代方式

And the favorite version uses AsyncIterator which is now enabled in NodeJS v10 and upwards. It's a much cleaner way to iterate

async function dbanalyze(){

  let cursor = db.collection('randomcollection').find()
  for await ( let doc of cursor ) {
    // do something with the current document
  }    
}

在某种程度上回到最初询问使用<的问题 for 循环,因为我们可以在此处执行 for-await-of 语法,以支持支持正确接口的iterable。并且光标确实支持此接口。

Which "in a way" comes back to what the question originally asked as to using a for loop since we can do the for-await-of syntax here fore supporting iterable which supports the correct interface. And the Cursor does support this interface.

如果你' re curios,这是我前段时间制作的一个列表,用于演示各种游标迭代技术。它甚至包括来自生成器功能的异步迭代器的案例:

If you're curios, here's a listing I cooked up some time ago to demonstrate various cursor iteration techniques. It even includes a case for Async Iterators from a generator function:

const Async = require('async'),
      { MongoClient, Cursor } = require('mongodb');

const testLen = 3;
(async function() {

  let db;

  try {
    let client = await MongoClient.connect('mongodb://localhost/');

    let db = client.db('test');
    let collection = db.collection('cursortest');

    await collection.remove();

    await collection.insertMany(
      Array(testLen).fill(1).map((e,i) => ({ i }))
    );

    // Cursor.forEach
    console.log('Cursor.forEach');
    await new Promise((resolve,reject) => {
      collection.find().forEach(
        console.log,
        err => {
          if (err) reject(err);
          resolve();
        }
      );
    });

    // Async.during awaits cursor.hasNext()
    console.log('Async.during');
    await new Promise((resolve,reject) => {

      let cursor = collection.find();

      Async.during(
        (callback) => Async.nextTick(() => cursor.hasNext(callback)),
        (callback) => {
          cursor.next((err,doc) => {
            if (err) callback(err);
            console.log(doc);
            callback();
          })
        },
        (err) => {
          if (err) reject(err);
          resolve();
        }
      );

    });

    // async/await allows while loop
    console.log('async/await while');
    await (async function() {

      let cursor = collection.find();

      while( await cursor.hasNext() ) {
        let doc = await cursor.next();
        console.log(doc);
      }

    })();

    // await event stream
    console.log('Event Stream');
    await new Promise((end,error) => {
      let cursor = collection.find();

      for ( let [k,v] of Object.entries({ end, error, data: console.log }) )
        cursor.on(k,v);
    });

    // Promise recursion
    console.log('Promise recursion');
    await (async function() {

      let cursor = collection.find();

      function iterate(cursor) {
        return cursor.hasNext().then( bool =>
          (bool) ? cursor.next().then( doc => {
            console.log(doc);
            return iterate(cursor);
          }) : Promise.resolve()
        )
      }

      await iterate(cursor);

    })();

    // Uncomment if node is run with async iteration enabled
    // --harmony_async_iteration


    console.log('Generator Async Iterator');
    await (async function() {

      async function* cursorAsyncIterator() {
        let cursor = collection.find();

        while (await cursor.hasNext() ) {
          yield cursor.next();
        }

      }

      for await (let doc of cursorAsyncIterator()) {
        console.log(doc);
      }

    })();


    // This is supported with Node v10.x and the 3.1 Series Driver
    await (async function() {

      for await (let doc of collection.find()) {
        console.log(doc);
      }

    })();

    client.close();

  } catch(e) {
    console.error(e);
  } finally {
    process.exit();
  }

})();

这篇关于异步子任务的异步光标迭代的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆