在 Node 中正确批处理嵌套的 Promise [英] Properly batch nested promises in Node

查看:24
本文介绍了在 Node 中正确批处理嵌套的 Promise的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在 Node 中运行 knex 种子,并且由于我的服务器的限制需要对我的数据库进行批处理.我开始掌握 Promise 和 async/await 的窍门,但是我无法让它在多个级别深度工作(此时特别让我失望的是它似乎干扰了批处理我无法理解的方式).我的 seed 文件如下所示:

I'm running a knex seed in Node and need to batch an additional query to my database due to restrictions on my server. I'm starting to get the hang of promises and async/await, but I'm having trouble getting it to work at several levels deep (what's throwing me off in particular at this point is that it seems to interfere with the batching in a way that I can't quite make sense of). My seed file looks like this:

exports.seed = async function(knex) {
  const fs = require('fs');
  const _ = require('lodash');

  function get_event_id(location) {
    return knex('events')
      .where({location: location})
      .first()
      .then(result => { return result['id']; })
      .finally(() => { knex.destroy() })
  }

  function createImage(row, event_id) {
    return {
      name: row[4],
      event_id: event_id
    }
  };

  async function run_query(line) {
      let row = line.split(',');
      let event_id = await get_event_id(row[0]);
      return createImage(row, event_id);
  };

  async function run_batch(batch) {

      return Promise.all(batch.map(run_query));
  } 

  const file = fs.readFileSync('./data.csv');
  const lines = file.toString().replace(/[
]/g, '').split('
').slice(1,60); // skip csv header, then run first 59 lines

  const batches = _.chunk(lines, 30); // set batch size

  let images = await Promise.all(batches.map(run_batch));

  console.log(_.flatten(images).length);

};

我的数据库一次可以处理 30 个查询.如果我在定义 lines 的行上使用 .slice(1,30) 运行单个批处理,一切都会正确解决.但是使用 60 如上所述运行给了我 ER_TOO_MANY_USER_CONNECTIONS:用户已经拥有超过 'max_user_connections' 的活动连接.

My database can handle 30 queries at a time. Everything resolves properly if I run a single batch using .slice(1,30) on the line where lines is defined. But running with 60 as above gives me ER_TOO_MANY_USER_CONNECTIONS: User already has more than 'max_user_connections' active connections.

如果我将 run_batch 的内容更改为 return batch.map(run_query),脚本就会完成,并且它返回正确的条目数(所以它似乎是正确分批).但随后的承诺仍然悬而未决.我错过了什么,有没有更优雅的方法来做到这一点?

The script completes if I change the content of run_batch to return batch.map(run_query), and it returns the correct number of entries (so it seems to be batching properly). But then the Promises are still pending. What am I missing, and is there a more elegant way to do this?

推荐答案

在这一行:

let images = await Promise.all(batches.map(run_batch));

您正在尝试并行运行所有批次,这完全打败了您的分块.

You are trying to run ALL the batches in parallel which is defeating your chunking entirely.

您可以使用带有 await 的常规 for 循环而不是 .map() 以便您运行批处理,等待它完成,然后运行下一批.

You could use a regular for loop with await instead of the .map() so you runva batch, wait for it to finish, then run the next batch.

let allResults = [];
for (let batch of batches) {
     let images = await run_batch(batch);
     allResults.push(...images);
}
console.log(allResults);

<小时>

仅供参考,您可能会受益于人们为处理同时进行的不超过 N 个请求的大型数组而编写的任意数量的函数.这些不需要您手动将数据分成批次.相反,他们会同时监控有多少请求在进行中,然后启动您想要的请求数量,当一个请求完成时,他们会启动另一个请求,为您收集结果.


FYI, you might benefit from any number of functions people have written for processing a large array with no more than N requests in flight at the same time. These do not require you to manually break the data into batches. Instead, they monitor how many requests are in-flight at the same time and they start up your desired number of requests and as one finishes, they start another one, collecting the results for you.

runN(fn, limit, cnt, options):针对多个请求循环访问 API

pMap(array, fn, limit):向一次只能处理 20 个请求的 api 发出多个请求

rateLimitMap(array, requestsPerSec, maxInFlight, fn):每秒最大请求数的正确异步方法

mapConcurrent(array, maxConcurrent, fn):Promise.all() 消耗了我所有的内存

Bluebird 承诺库异步承诺库.

这篇关于在 Node 中正确批处理嵌套的 Promise的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆