如何改善这个嵌套的异步循环? [英] How can I improve this nested asynchronous loop?

查看:62
本文介绍了如何改善这个嵌套的异步循环?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题如下.我有一个像这样的对象数组:

The problem is as follows. I have an array of objects like so:

let myObj = [
{'db1':['doc1','doc2','doc3']},
{'db2':['doc4','doc5']},
{'db3':['doc7','doc8','doc9','doc10']}
]

请注意,这是我决定用于此问题的数据结构,如果可以改善整体实现,则可以更改.实际的db和doc ID是从格式如下的文本文件中读取的.

Note that this is a data structure I decided to use for the problem and can be changed if it can improve the overall implementation. The actual db and doc Ids are read from a text file formatted as below.

"db1","doc1"
"db1","doc2"
...

我的应用程序将同步遍历数据库列表.在每个数据库迭代中,将有一个文档列表的异步迭代.每个文档都将被检索,处理并保存回数据库.

My app will iterate through the db list synchronously. Inside each db iteration, there will be an asynchronous iteration of the document list. Each document will be retrieved, processed and saved back to the db.

所以基本上在任何给定的实例中:一个数据库,但有多个文档.

So basically at any given instance: one db, but multiple documents.

我有一个像上面这样的有效实现方式:

I have a working implementation of the above like so:

dbIterator :用于迭代dbs的同步外部循环.传递给 docIterator 的回调将触发下一次迭代.

dbIterator: the synchronous outer loop to iterate dbs. The callback passed to docIterator will trigger the next iteration.

const dbIterator = function (x) {
  if (x < myObj.length) {
    let dbObj = myObj[x];
    let dbId = Object.keys(dbObj)[0];
    docIterator(dbId, dbObj[dbId], ()=>merchantIterator(x+1));
  } else {
    logger.info('All dbs processed');
  }
};

docIterator :迭代文档的异步循环.所有文档处理完毕后,将调用回调 cb .通过 docsProcessed docsToBeProcessed 变量

docIterator: the asynchronous loop to iterate docs. The callback cb is called after all documents are processed. This is tracked via the docsProcessed and docsToBeProcessed variables

const docIterator = function(dbId, docIds, cb){
  //create connection
  targetConnection = //some config for connection to dbId
  let docsProcessed = 0;
  let docsToBeProcessed = docIds.length;

  //asynchronous iteration of documents
  docIds.forEach((docId)=>{
    getDocument(docId, targetConnection).then((doc)=>{
      //process document
      processDoc(doc, targetConnection).then(()=>{
        //if processing is successful
        if (++docsProcessed >= docsToBeProcessed) {
          cb();
        }
      })
       //if processing fails
      .catch((e) => {
        logger.error('error when processing document');
        if (++docsProcessed >= docsToBeProcessed) {
          cb();
        }
      });

    }).catch((e)=>{
      logger.error('error when retrieving document: ');
      if (++docsProcessed >= docsToBeProcessed) {
        cb();
      }
    });
  });
};

processDoc:用于处理和保存单个文档.这将返回一个承诺,该承诺将在文档处理完成后得到解决,依次处理 docsProcessed 和有条件地( docsProcessed> = docsToBeProcessed )调用传递回的回调docIterator

processDoc: used to process and save an individual document. This returns a promise that gets resolved when the document processing is done which in turn increments docsProcessed and conditionally (docsProcessed >= docsToBeProcessed) calls the call back passed into docIterator

const processDoc = function(doc, targetConnection) {

  return new Promise(function(resolve, reject) {
    if(shouldThisDocBeProcessed(doc){
      let updatedDoc = logic(doc);
      targetConnection.insert(updatedDoc, updatedDoc._id,
        function (error, response) {
          if (!error){
            logger.info('updated successfully');
          } else {
            logger.error('error when saving doc');
          }
          resolve();
        }
      );
    } else {
      resolve();
    }
  })
};

这按预期工作,但对我来说,此实现次佳且混乱.我很确定可以对此进行改进,最重要的是有机会更好地理解和实现针对同步和异步问题的解决方案.

This works as expected but for me this implementation is sub-optimal and messy. I'm pretty sure this can be improved upon and most importantly a chance to better understand and implement solutions to synchronous and asynchronous problems.

我愿意接受建设性的批评.那么如何改善呢?

I'm open to constructive criticism. So how can this be improved?

推荐答案

也许是这样?

可以在此处找到节气门的示例实现.

//this should be available in both modules so you can filter
const Fail = function(details){this.details=details;};
// docIterator(dbId,docIds)
// .then(
//   results =>{
//     const failedResults = results.filter(
//       result => (result&&result.constructor)===Failed
//     );
//     const successfullResults = results.filter(
//       result => (result&&result.constructor)!==Failed
//     );
//   }
// )

const docIterator = function(dbId, docIds){
  //create connection
  // targetConnection = //some config for connection to dbId
  let docsProcessed = 0;
  let docsToBeProcessed = docIds.length;
  //asynchronous iteration of documents
  docIds.map(
    docId =>
      new Promise(
        (resolve,reject) =>
          //if you use throttled you can do:
          // max10(
          //   ([docId,targetConnection])=>
          //     getDocument(docId,targetConnection)
          // )([docId, targetConnection])
          getDocument(docId, targetConnection)
      )
      .then(
        doc =>
          //if this returns nothing then maybe you'd like to return the document
          processDoc(doc, targetConnection)
          .then(
            _ => doc
          )
      )
      .catch(
        err => new fail([err,docId])
      )

  )
};

这篇关于如何改善这个嵌套的异步循环?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆