解析大型 CSV 并流式传输承诺行 [英] Parsing large CSV and streaming rows of promises

查看:70
本文介绍了解析大型 CSV 并流式传输承诺行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在尝试流式传输 csv、为每一行发出 http 请求、执行所有内容并以正确"的方式登录到控制台时有点混乱.命令.最终,我认为我没有正确地履行我的承诺,或者......?

getting a bit mixed up in trying to stream a csv, make http requests for each row, and have everything execute and log to console in the "proper" order. Ultimately, I think I'm not wrapping my promises right, or...?

const getUserByEmail = async (email) => {
  const encodedEmail = encodeURIComponent(email);

  try {
    const response = await http.get(`users?email=${encodedEmail}`);
    const userId = response.data.data[0] && response.data.data[0].id;

    return (userId ? userId : `${email} not found`);
  } catch (error) {
    console.error('get user error: ', error);
  }
};

const run = async () => {
  console.log('==> Reading csv ...');

  const promises = [];
  const readStream = fs.createReadStream('import-test.csv')
    .pipe(csv.parse({ headers: true }))
    .on('error', (error) => console.error('stream error: ', error))
    .on('data', (row) => {
      promises.push(getUserByEmail(row.email));
    })
    .on('end', rowCount => {
      console.log(`==> Parsed ${rowCount} rows from csv ...`);
    })

  await Promise.all(promises)
    .then(values => console.log(values))

  console.log('==> End of script')
};

run();

我正在尝试/期望上面的代码获取 csv 的每一行,将每个 http 调用(一个承诺)推送到一个承诺数组,并按照我期望的顺序执行/记录到控制台.

I'm attempting / expecting the code above to take each row of the csv, push each http call (a promise) to an array of promises, and have everything execute/log to console in the order I'm expecting.

这是我的实际输出:

==> Reading csv...
[]
==> End of script
==> Parsed 10 rows from csv ...

这就是我所期待的:

==> Reading csv...
==> Parsed 10 rows from csv ...
[
  QyDPkn3WZp,
  e75KzrqYxK,
  iqDXoEFMZy,
  PstouMRz3y,
  w188hLyeT6,
  g18oxMOy6l,
  8wjVJutFnh,
  fakeEmail@fakeDomain.com not found,
  QEHaG3cp7d,
  y8I4oX6aCe
]
==> End of script

对我来说最大的问题是任何东西都在=="之后记录.End of script",这向我表明我没有很好地理解所有以前的事件何时/为什么按照它们的顺序记录.

The biggest issue for me is that anything is logging after "==> End of script", which indicates to me that I don't have a strong grasp of when/why all previous events are logging in the order that they are.

最终——而且我还没有到达那里——我还想将这些请求缓冲/计时到每分钟 100 次,否则我将受到这个特定 API 的速率限制.

Ultimately—and I haven't gotten there yet—I'd like to also buffer/time these requests to 100 per minute otherwise I will be rate-limited by this particular API.

谢谢!

推荐答案

readStream 一直到 await Promise.all(promises) 的洞是同步的 - data> 事件是异步的,并在另一个事件循环中填充承诺因此,当您调用 Promise.all 时,promises 是一个空数组 - 您不是在等待流结束.您可能希望像这样将逻辑放在结束事件中

The hole readStream all the way down to await Promise.all(promises) is synchronous - the data event is asynchronous and populates promises in another event loop therefore promises is an empty array when you call Promise.all - you are not waiting for the stream to end. You might want to put your logic in the end event instead like this

const run = async () => {
  console.log('==> Reading csv ...');

  const promises = [];
  const readStream = fs.createReadStream('import-test.csv')
    .pipe(csv.parse({ headers: true }))
    .on('error', (error) => console.error('stream error: ', error))
    .on('data', (row) => {
      promises.push(getUserByEmail(row.email));
    })
    .on('end', async rowCount => {
      await Promise.all(promises)
        .then(values => console.log(values))

      console.log('==> End of script')
    })
}


另一种更简单的方法是使用异步迭代器readStream 有一个 symbol.asyncIterator 你可以使用

const run = async () => {
  console.log('==> Reading csv ...');

  let rowCount = 0
  const promises = []
  const readStream = fs.createReadStream('import-test.csv')
    .pipe(csv.parse({ headers: true }))
  
  for await (let row of readStream) {
    rowCount++
    promises.push(getUserByEmail(row.email));
  }
    
  console.log(`==> Parsed ${rowCount} rows from csv ...`)

  await Promise.all(promises).then(console.log)

  console.log('==> End of script')
}

我会进一步限制并发性并执行以下操作:

I would have gone further to limit the concurrency and do:

const run = async () => {
  console.log('==> Reading csv ...');

  const result = []
  const readStream = fs.createReadStream('import-test.csv')
    .pipe(csv.parse({ headers: true }))
  
  for await (let row of readStream) {
    result.push(await getUserByEmail(row.email))
  }

  console.log(result)
  console.log('==> End of script')
}

如果您想提高异步迭代器的并发性,请查看这篇文章,但要小心.使用这种方法时,结果可能会出现乱序

if you want to increase the concurency of a async iterator, then look at this post but beware. the result could be out of order when using this method

这篇关于解析大型 CSV 并流式传输承诺行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆