在Node中正确批处理嵌套的Promise [英] Properly batch nested promises in Node

查看:104
本文介绍了在Node中正确批处理嵌套的Promise的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Node中运行knex seed,由于服务器上的限制,我需要向我的数据库批处理一个附加查询.我已经开始兑现了promise和async/await的问题,但是我很难使它在多个层次上正常工作(特别是在这一点上让我失望的是,这似乎干扰了批处理中的批处理.我不太了解的方式).我的seed文件如下所示:

I'm running a knex seed in Node and need to batch an additional query to my database due to restrictions on my server. I'm starting to get the hang of promises and async/await, but I'm having trouble getting it to work at several levels deep (what's throwing me off in particular at this point is that it seems to interfere with the batching in a way that I can't quite make sense of). My seed file looks like this:

exports.seed = async function(knex) {
  const fs = require('fs');
  const _ = require('lodash');

  function get_event_id(location) {
    return knex('events')
      .where({location: location})
      .first()
      .then(result => { return result['id']; })
      .finally(() => { knex.destroy() })
  }

  function createImage(row, event_id) {
    return {
      name: row[4],
      event_id: event_id
    }
  };

  async function run_query(line) {
      let row = line.split(',');
      let event_id = await get_event_id(row[0]);
      return createImage(row, event_id);
  };

  async function run_batch(batch) {

      return Promise.all(batch.map(run_query));
  } 

  const file = fs.readFileSync('./data.csv');
  const lines = file.toString().replace(/[\r]/g, '').split('\n').slice(1,60); // skip csv header, then run first 59 lines

  const batches = _.chunk(lines, 30); // set batch size

  let images = await Promise.all(batches.map(run_batch));

  console.log(_.flatten(images).length);

};

我的数据库一次可以处理30个查询.如果我在定义lines的行上使用.slice(1,30)运行单个批处理,一切都会正确解决.但是以上面的60运行会给出ER_TOO_MANY_USER_CONNECTIONS: User already has more than 'max_user_connections' active connections.

My database can handle 30 queries at a time. Everything resolves properly if I run a single batch using .slice(1,30) on the line where lines is defined. But running with 60 as above gives me ER_TOO_MANY_USER_CONNECTIONS: User already has more than 'max_user_connections' active connections.

如果我将run_batch的内容更改为return batch.map(run_query),该脚本将完成,并且它将返回正确的条目数(因此似乎可以正确进行批处理).但是那时Promise仍然悬而未决.我想念的是什么,还有没有更优雅的方式做到这一点?

The script completes if I change the content of run_batch to return batch.map(run_query), and it returns the correct number of entries (so it seems to be batching properly). But then the Promises are still pending. What am I missing, and is there a more elegant way to do this?

推荐答案

在这一行:

let images = await Promise.all(batches.map(run_batch));

您正在尝试并行运行所有批处理,这将完全破坏您的分块.

You are trying to run ALL the batches in parallel which is defeating your chunking entirely.

您可以对await而不是.map()使用常规的for循环,这样就可以运行批处理,等待其完成,然后再运行下一个批处理.

You could use a regular for loop with await instead of the .map() so you runva batch, wait for it to finish, then run the next batch.

let allResults = [];
for (let batch of batches) {
     let images = await run_batch(batch);
     allResults.push(...images);
}
console.log(allResults);


仅供参考,您可能会受益于人们为处理大型数组而编写的许多函数,这些函数在飞行中的请求不超过N个.这些不需要您手动将数据分为几批.相反,他们会同时监视进行中的请求数量,并启动您所需的请求数量,并在一个完成时启动另一个请求,并为您收集结果.


FYI, you might benefit from any number of functions people have written for processing a large array with no more than N requests in flight at the same time. These do not require you to manually break the data into batches. Instead, they monitor how many requests are in-flight at the same time and they start up your desired number of requests and as one finishes, they start another one, collecting the results for you.

runN(fn, limit, cnt, options):通过针对多个请求的API

pMap(array, fn, limit): rateLimitMap(array, requestsPerSec, maxInFlight, fn):每秒最大请求数的正确异步方法

mapConcurrent(array, maxConcurrent, fn): Promise.all()消耗了我所有的内存

Bluebird承诺库异步承诺库.

这篇关于在Node中正确批处理嵌套的Promise的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆