使用Mongoose模式导入CSV [英] Import CSV Using Mongoose Schema

查看:57
本文介绍了使用Mongoose模式导入CSV的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当前,我需要将一个较大的CSV文件推送到mongo数据库中,并且值的顺序需要确定数据库条目的键:

Currently I need to push a large CSV file into a mongo DB and the order of the values needs to determine the key for the DB entry:

示例CSV文件:

9,1557,358,286,Mutantville,4368,2358026,,M,0,0,0,1,0
9,1557,359,147,Wroogny,4853,2356061,,D,0,0,0,1,0

将其解析为数组的代码:

Code to parse it into arrays:

var fs = require("fs");

var csv = require("fast-csv");

fs.createReadStream("rank.txt")
    .pipe(csv())
    .on("data", function(data){
        console.log(data);
    })
    .on("end", function(data){
        console.log("Read Finished");
    });

代码输出:

[ '9',
  '1557',
  '358',
  '286',
  'Mutantville',
  '4368',
  '2358026',
  '',
  'M',
  '0',
  '0',
  '0',
  '1',
  '0' ]
[ '9',
  '1557',
  '359',
  '147',
  'Wroogny',
  '4853',
  '2356061',
  '',
  'D',
  '0',
  '0',
  '0',
  '1',
  '0' ]

如何将数组插入到Mongoose模式中以进入Mongo数据库?

How do I insert the arrays into my mongoose schema to go into mongo db?

模式:

var mongoose = require("mongoose");


var rankSchema = new mongoose.Schema({
   serverid: Number,
   resetid: Number,
   rank: Number,
   number: Number,
   name: String,
   land: Number,
   networth: Number,
   tag: String,
   gov: String,
   gdi: Number,
   protection: Number,
   vacation: Number,
   alive: Number,
   deleted: Number
});

module.exports = mongoose.model("Rank", rankSchema);

数组的顺序需要与架构的顺序匹配,例如在数组中,键"serverid"必须始终保存第一个数字9,依此类推.我正在使用Node.JS

The order of the array needs to match the order of the schema for instance in the array the first number 9 needs to always be saved as they key "serverid" and so forth. I'm using Node.JS

推荐答案

您可以通过获取

You can do it with fast-csv by getting the headers from the schema definition which will return the parsed lines as "objects". You actually have some mismatches, so I've marked them with corrections:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

只要该模式实际上与提供的CSV对齐,就可以了.这些是我可以看到的更正,但是如果您需要对实际字段名称进行不同的对齐,则需要进行调整.但是基本上在String的位置上有一个Number,本质上是一个额外的字段,我认为这是CSV中的空白字段.

As long as the schema actually lines up to the provided CSV then it's okay. These are the corrections that I can see but if you need the actual field names aligned differently then you need to adjust. But there was basically a Number in the position where there is a String and essentially an extra field, which I'm presuming is the blank one in the CSV.

通常的事情是从架构中获取字段名称的数组,并在制作csv解析器实例时将其传递给选项:

The general things are getting the array of field names from the schema and passing that into the options when making the csv parser instance:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

一旦您真正执行了此操作,便会得到一个对象"而不是数组:

Once you actually do that then you get an "Object" back instead of an array:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

不用担心类型",因为猫鼬会根据模式强制转换值.

Don't worry about the "types" because Mongoose will cast the values according to schema.

其余的事件发生在data事件的处理程序中.为了获得最高效率,我们使用 insertMany() 每10,000只写入数据库一次线.它实际如何到达服务器和进程取决于MongoDB版本,但是根据您将为内存收集使用权衡"并为​​写入单个数据而收集的单个集合的平均字段数,10,000应该是相当合理的.合理的网络请求.如有必要,请减小数字.

The rest happens within the handler for the data event. For maximum efficiency we are using insertMany() to only write to the database once every 10,000 lines. How that actually goes to the server and processes depends on the MongoDB version, but 10,000 should be pretty reasonable based on the average number of fields you would import for a single collection in terms of the "trade-off" for memory usage and writing a reasonable network request. Make the number smaller if necessary.

重要的部分是将这些调用标记为async函数,并将标记为 pause() 流和 pause() data事件

The important parts are to mark these calls as async functions and await the result of the insertMany() before continuing. Also we need to pause() the stream and resume() on each item otherwise we run the risk of overwriting the buffer of documents to insert before they are actually sent. The pause() and resume() are necessary to put "back-pressure" on the pipe, otherwise items just keep "coming out" and firing the data event.

自然,对10,000个条目的控件要求我们在每次迭代和流完成时都进行检查,以清空缓冲区并将所有剩余文档发送到服务器.

Naturally the control for the 10,000 entries requires we check that both on each iteration and on stream completion in order to empty the buffer and send any remaining documents to the server.

这确实是您要执行的操作,因为您当然不希望通过data事件的每次"迭代都向服务器触发异步请求,或者根本不等待每个请求完成.您无需检查非常小的文件"就可以摆脱困境,但是对于任何实际负载,由于正在进行中"的异步调用尚未完成,因此您肯定会超出调用堆栈.

That's really what you want to do, as you certainly don't want to fire off an async request to the server both on "every" iteration through the data event or essentially without waiting for each request to complete. You'll get away with not checking that for "very small files", but for any real world load you're certain to exceed the call stack due to "in flight" async calls which have not yet completed.

FYI-已使用package.json. mz 是可选的,因为它只是标准节点内置-在我只是习惯使用的库中.该代码当然可以与fs模块完全互换.

FYI - a package.json used. The mz is optional as it's just a modernized Promise enabled library of standard node "built-in" libraries that I'm simply used to using. The code is of course completely interchangeable with the fs module.

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}


实际上,对于Node v8.9.x及更高版本,我们甚至可以通过


Actually with Node v8.9.x and above then we can even make this much simpler with an implementation of AsyncIterator through the stream-to-iterator module. It's still in Iterator<Promise<T>> mode, but it should do until Node v10.x becomes stable LTS:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

基本上,所有流事件"的处理,暂停和恢复都将替换为简单的for循环:

Basically, all of the stream "event" handling and pausing and resuming gets replaced by a simple for loop:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

容易!当它变得更稳定时,该问题将在以后的节点实现中使用for..await..of清除.但以上操作在指定版本及更高版本上都可以.

Easy! This gets cleaned up in later node implementation with for..await..of when it becomes more stable. But the above runs fine on the from the specified version and above.

这篇关于使用Mongoose模式导入CSV的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆