使用 mongoose 将一个非常大的 CSV 保存到 mongoDB [英] Save a very big CSV to mongoDB using mongoose

查看:53
本文介绍了使用 mongoose 将一个非常大的 CSV 保存到 mongoDB的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含超过 200'000 行的 CSV 文件.我需要将它保存到 MongoDB.

I have a CSV file containing more than 200'000 rows. I need to save it to MongoDB.

如果我尝试 for 循环,Node 将耗尽内存.

If I try a for loop, Node will run out of memory.

fs.readFile('data.txt', function(err, data) {
  if (err) throw err;

  data.split('\n');

  for (var i = 0; i < data.length, i += 1) {
    var row = data[i].split(',');

    var obj = { /* The object to save */ }

    var entry = new Entry(obj);
    entry.save(function(err) {
      if (err) throw err;
    }
  } 
}

如何避免内存不足?

推荐答案

欢迎使用流媒体.你真正想要的是一个事件流",它可以一次一个块"处理你的输入,当然理想情况下是通过一个通用的分隔符,比如你当前使用的换行符"字符.

Welcome to streaming. What you really want is an "evented stream" that processes your input "one chunk at a time", and of course ideally by a common delimiter such as the "newline" character you are currently using.

对于真正有效的东西,你可以添加 MongoDB "Bulk API" 插入以使您的加载尽可能快,而不会占用所有机器内存或 CPU 周期.

For really efficient stuff, you can add usage of MongoDB "Bulk API" inserts to make your loading as fast as possible without eating up all of the machine memory or CPU cycles.

不提倡,因为有各种可用的解决方案,但这里有一个利用 的清单line-input-stream 包使行终止符"部分变得简单.

Not advocating as there are various solutions available, but here is a listing that utilizes the line-input-stream package to make the "line terminator" part simple.

仅通过示例"定义架构:

Schema definitions by "example" only:

var LineInputStream = require("line-input-stream"),
    fs = require("fs"),
    async = require("async"),
    mongoose = require("mongoose"),
    Schema = mongoose.Schema;

var entrySchema = new Schema({},{ strict: false })

var Entry = mongoose.model( "Schema", entrySchema );

var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));

stream.setDelimiter("\n");

mongoose.connection.on("open",function(err,conn) { 

    // lower level method, needs connection
    var bulk = Entry.collection.initializeOrderedBulkOp();
    var counter = 0;

    stream.on("error",function(err) {
        console.log(err); // or otherwise deal with it
    });

    stream.on("line",function(line) {

        async.series(
            [
                function(callback) {
                    var row = line.split(",");     // split the lines on delimiter
                    var obj = {};             
                    // other manipulation

                    bulk.insert(obj);  // Bulk is okay if you don't need schema
                                       // defaults. Or can just set them.

                    counter++;

                    if ( counter % 1000 == 0 ) {
                        stream.pause();
                        bulk.execute(function(err,result) {
                            if (err) callback(err);
                            // possibly do something with result
                            bulk = Entry.collection.initializeOrderedBulkOp();
                            stream.resume();
                            callback();
                        });
                    } else {
                        callback();
                    }
               }
           ],
           function (err) {
               // each iteration is done
           }
       );

    });

    stream.on("end",function() {

        if ( counter % 1000 != 0 )
            bulk.execute(function(err,result) {
                if (err) throw err;   // or something
                // maybe look at result
            });
    });

});

所以通常那里的流"接口分解输入"以处理一次一行".这会阻止您一次加载所有内容.

So generally the "stream" interface there "breaks the input down" in order to process "one line at a time". That stops you from loading everything at once.

主要部分是来自 MongoDB 的 "Bulk Operations API".这允许您在实际发送到服务器之前一次排队"许多操作.因此,在使用模"的情况下,每处理 1000 个条目仅发送写入.你真的可以做任何不超过 16MB BSON 限制的事情,但要保持它易于管理.

The main parts are the "Bulk Operations API" from MongoDB. This allows you to "queue up" many operations at a time before actually sending to the server. So in this case with the use of a "modulo", writes are only sent per 1000 entries processed. You can really do anything up to the 16MB BSON limit, but keep it manageable.

除了批量处理的操作之外,还有一个来自 async<的额外限制器"/a> 图书馆.这不是真正必需的,但这可确保在任何时候处理的文档基本上不超过模数限制".一般的批处理插入"除了内存之外没有 IO 成本,但执行"调用意味着 IO 正在处理.所以我们等待而不是排队更多的东西.

In addition to the operations being processed in bulk, there is an additional "limiter" in place from the async library. It's not really required, but this ensures that essentially no more than the "modulo limit" of documents are in process at any time. The general batch "inserts" come at no IO cost other than memory, but the "execute" calls mean IO is processing. So we wait rather than queuing up more things.

对于流处理"CSV 类型的数据,您肯定可以找到更好的解决方案,这似乎是.但总的来说,这为您提供了如何在不占用 CPU 周期的情况下以内存高效的方式执行此操作的概念.

There are surely better solutions you can find for "stream processing" CSV type data which this appears to be. But in general this gives you the concepts to how to do this in a memory efficient manner without eating CPU cycles as well.

这篇关于使用 mongoose 将一个非常大的 CSV 保存到 mongoDB的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆