无法使用Node.js将大量数据填充到mongodb中 [英] Can't populate big chunk of data to mongodb using Node.js

查看:90
本文介绍了无法使用Node.js将大量数据填充到mongodb中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我被要求导入从全市许多站点收集的大部分天气数据.每个站点都有一台带有一个文件夹的计算机,该文件夹每5分钟被同步到一台中央服务器.每天都会创建一个新文件.因此,基本上结构是这样的. 一个txt文件的格式为csv文件,其中第一行为字段,其余为数字.

I am asked to import a big chunk of weather data collected from many sites all over the city. Each site has 1 computer having one folder, which is being synced to a central server each 5 mins. Everyday, a new file is created. So, basically the structure is like this. One txt file has format as a csv file, which has the 1st line as fields, and the rest are numbers.

服务器上的文件夹
| __ site1 __ date1.txt
| | __ date2.txt
|
| __ site2 __ date1.txt
| __ date2.txt
我编写了一个小型的node.js应用程序,以将这些数据填充到mongoDB中.但是,当前,我们只有3个站点,但是每个站点都有近900个txt文件,每个文件包含24 * 20 = 288行(因为每5分钟记录一次数据).我尝试运行节点应用程序,但是在读取第一个文件夹的大约100个文件之后,该程序崩溃,并显示有关内存分配失败的错误.

folder_on_server
|__ site1 __ date1.txt
| |__ date2.txt
|
|__ site2 __ date1.txt
|__ date2.txt
I wrote a small node.js app to populate those data onto mongoDB. However, currently, we have only 3 sites, but each site has almost 900 txt files, each file contains 24*20 = 288 rows (as data is recorded each 5 mins). I tried to run the node app, but after reading about 100 files of the first folder, the program crashes with an error about memory allocation failure.

我尝试了许多方法来改善这一点:

I have tried many ways to improve this:

  1. 将nodejs的内存大小增加到8GB =>更好,读取了更多文件,但仍无法移至下一个文件夹.
  2. 在_.forEach循环的末尾将一些变量设置为null和undefined(我使用下划线)=>没有帮助.
  3. 移动files数组(使用fs.readdir),这样第一个元素将被删除=>也无济于事.

有没有什么方法可以强制js在每次完成读取文件后清理内存? 谢谢

Is there any ways to force js to clean up memory each time it finishes reading a file? Thanks

更新1:我最终一次在每个文件夹中添加100个文件.这似乎很乏味,但确实行得通,而且就像一次工作.但是,我仍然想为此找到解决方案.

Update 1: I ended up adding 100 files in each folders at a time. This seems to be tedious but it worked, and this is like one time job. However, I still want to find a solution for this.

推荐答案

尝试改用将每个文件加载到内存的过程.

Try using streams instead of loading each file into memory.

我向您发送了拉动请求,其中包含使用流和异步输入/输出.

I've sent you a pull request with an implementation using streams and async i/o.

这是大多数:

var Async = require('async');
var Csv = require('csv-streamify');
var Es = require('event-stream');
var Fs = require('fs');
var Mapping = require('./folder2siteRef.json');
var MongoClient = require('mongodb').MongoClient;

var sourcePath = '/hnet/incoming/' + new Date().getFullYear();

Async.auto({
  db: function (callback) {
    console.log('opening db connection');
    MongoClient.connect('mongodb://localhost:27017/test3', callback);
  },
  subDirectory: function (callback) {
    // read the list of subfolder, which are sites
    Fs.readdir(sourcePath, callback);
  },
  loadData: ['db', 'subDirectory', function (callback, results) {
    Async.each(results.subDirectory, load(results.db), callback);
  }],
  cleanUp: ['db', 'loadData', function (callback, results) {
    console.log('closing db connection');
    results.db.close(callback);
  }]
}, function (err) {
  console.log(err || 'Done');
});

var load = function (db) {
  return function (directory, callback) {
    var basePath = sourcePath + '/' + directory;
    Async.waterfall([
      function (callback) {
        Fs.readdir(basePath, callback); // array of files in a directory
      },
      function (files, callback) {
        console.log('loading ' + files.length + ' files from ' + directory);
        Async.each(files, function (file, callback) {
          Fs.createReadStream(basePath + '/' + file)
            .pipe(Csv({objectMode: true, columns: true}))
            .pipe(transform(directory))
            .pipe(batch(200))
            .pipe(insert(db).on('end', callback));
        }, callback);
      }
    ], callback);
  };
};

var transform = function (directory) {
  return Es.map(function (data, callback) {
    data.siteRef = Mapping[directory];
    data.epoch = parseInt((data.TheTime - 25569) * 86400) + 6 * 3600;
    callback(null, data);
  });
};

var insert = function (db) {
  return Es.map(
    function (data, callback) {
      if (data.length) {
        var bulk = db.collection('hnet').initializeUnorderedBulkOp();
        data.forEach(function (doc) {
          bulk.insert(doc);
        });
        bulk.execute(callback);
      } else {
        callback();
      }
    }
  );
};

var batch = function (batchSize) {
  batchSize = batchSize || 1000;
  var batch = [];

  return Es.through(
    function write (data) {
      batch.push(data);
      if (batch.length === batchSize) {
        this.emit('data', batch);
        batch = [];
      }
    },
    function end () {
      if (batch.length) {
        this.emit('data', batch);
        batch = [];
      }
      this.emit('end');
    }
  );
};

我已经使用流更新了您的tomongo.js脚本.我也将其更改为对其文件I/O使用异步而不是同步.

I've updated your tomongo.js script using streams. I've also changed it to use async instead of sync for its file i/o.

我根据您的代码中定义的结构(带有小型数据集)对它进行了测试,并且效果非常好.我对带有900xfiles和288xlines的3xdirs进行了一些有限的测试.我不确定数据的每一行有多大,所以我添加了一些随机属性.它的速度非常快.看看数据如何处理.如果它引起问题,则可以在执行批量插入操作时尝试以其他写入问题来限制它.

I tested this against the structure defined in your code with small data sets and it worked really well. I did some limited testing against 3xdirs with 900xfiles and 288xlines. I'm not sure how big each row of your data is, so i threw a few random properties in. Its quite fast. See how it goes with your data. If it causes issues, you could try throttling it with different write concerns when executing the bulk insert operation.

也请查看其中一些链接,以获取有关node.js中流的更多信息:

Also check out some of these links for more information on streams in node.js:

http://nodestreams.com -工具

http://nodestreams.com - a tool written by John Resig with many stream examples.

事件流是一个非常有用的流模块.

And event-stream a very useful streams module.

这篇关于无法使用Node.js将大量数据填充到mongodb中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆