在Node.js中处理大型CSV上传 [英] Processing large CSV uploads in Node.js

查看:145
本文介绍了在Node.js中处理大型CSV上传的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据此处的先前帖子:

节点异步循环 - 如何使这段代码按顺序运行?

...我正在寻找有关处理大型数据上传文件的更广泛建议。

...I'm looking for broader advice on processing large data upload files.

场景:

用户上传一个包含数十万到数百万行的非常大的CSV文件。它使用multer流式传输到端点:

User uploads a very large CSV file with hundreds-of-thousands to millions of rows. It's streaming into an endpoint using multer:

const storage = multer.memoryStorage();
const upload = multer({ storage: storage });

router.post("/", upload.single("upload"), (req, res) => {
    //...
});

每行都转换为JSON对象。然后将该对象映射到几个较小的对象,这些对象需要插入到几个不同的表中,展开并通过各种微服务容器访问。

Each row is transformed into a JSON object. That object is then mapped into several smaller ones, which need to be inserted into several different tables, spread out across, and accessed by, various microservice containers.

async.forEachOfSeries(data, (line, key, callback) => {
    let model = splitData(line);
    //save model.record1, model.record2, etc. sequentially
});

很明显,我会用这种方法遇到内存限制。执行此操作的最有效方法是什么?

It's obvious I'm going to run into memory limitations with this approach. What is the most efficient manner for doing this?

推荐答案

为避免内存问题,您需要使用溪流 - 用简单的话说,逐步

To avoid memory issues you need to process the file using streams - in plain words, incrementally.

您可以结合使用 CSV流解析器来完成此操作,将二进制内容作为CSV行和 through2 流式传输,这是一个允许您控制流量的流实用程序流。

You can do this with a combination of a CSV stream parser, to stream the binary contents as CSV rows and through2, a stream utility that allows you to control the flow of the stream.

流程如下:


  • 您获取数据流

  • 您通过CSV解析器管道

  • 你通过一个2来管道它

  • 你保存数据库中的每一行

  • 完成保存后,请致电 cb () t o继续下一个项目。

  • You acquire a stream to the data
  • You pipe it through the CSV parser
  • You pipe it through a through2
  • You save each row in your database
  • When you're done saving, call cb() to move on to the next item.

我不熟悉 multer 但这是一个使用来自文件的流的示例。

I'm not familiar with multer but here's an example that uses a stream from a File.

const fs = require('fs')
const csv = require('csv-stream')
const through2 = require('through2')

const stream = fs.createReadStream('foo.csv')
  .pipe(csv.createStream({
      endLine : '\n',
      columns : ['Year', 'Make', 'Model'],
      escapeChar : '"',
      enclosedChar : '"'
  }))
  .pipe(through2({ objectMode: true }, (row, enc, cb) => {
    // - `row` holds the first row of the CSV,
    //   as: `{ Year: '1997', Make: 'Ford', Model: 'E350' }`
    // - The stream won't process the *next* item unless you call the callback
    //  `cb` on it.
    // - This allows us to save the row in our database/microservice and when
    //   we're done, we call `cb()` to move on to the *next* row.
    saveIntoDatabase(row).then(() => {
      cb(null, true)
    })
    .catch(err => {
      cb(err, null)
    })
  }))
  .on('data', data => {
    console.log('saved a row')
  })
  .on('end', () => {
    console.log('end')
  })
  .on('error', err => {
    console.error(err)
  })

// Mock function that emulates saving the row into a database,
// asynchronously in ~500 ms
const saveIntoDatabase = row =>
  new Promise((resolve, reject) =>
    setTimeout(() => resolve(), 500))

示例 foo.csv CSV是这样的:

The example foo.csv CSV is this:

1997,Ford,E350
2000,Mercury,Cougar
1998,Ford,Focus
2005,Jaguar,XKR
1991,Yugo,LLS
2006,Mercedes,SLK
2009,Porsche,Boxter
2001,Dodge,Viper



注释




  • 此方法避免了必须在整个CSV内存中加载。一旦处理了,它就会超出范围/变得无法缓存,因此它有资格进行垃圾收集。这就是使这种方法具有内存效率的原因。阅读 Streams手册,了解有关流的更多信息。

  • 您可能希望每个周期保存/处理超过1行。在这种情况下,将一些推入一个数组,处理/保存整个数组,然后调用 cb 继续前进到下一个块 - 重复该过程。

  • Streams发出可以监听的事件。 结束 / 错误事件对于回复操作是成功还是失败特别有用。

  • Express默认使用流 - 我几乎可以肯定你根本不需要 multer

  • Notes

    • This approach avoids having to load the entire CSV in-memory. As soon as a row is processed it goes out of scope/becomes unreacheable, hence it's eligible for Garbage Collection. This is what makes this approach so memory efficient. Read the Streams Handbook for more info on streams.
    • You probably want to save/process more than 1 row per cycle. In that case push some rows into an Array, process/save the entire Array and then call cb to move on to the next chunk - repeating the process.
    • Streams emit events that you can listen on. The end/error events are particularly useful for responding back whether the operation was a success or a failure.
    • Express works with streams by default - I'm almost certain you don't need multer at all.
    • 这篇关于在Node.js中处理大型CSV上传的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆