使用 Mongoose Schema 导入 CSV [英] Import CSV Using Mongoose Schema

查看:23
本文介绍了使用 Mongoose Schema 导入 CSV的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

目前我需要将一个大的 CSV 文件推送到一个 mongo 数据库中,并且值的顺序需要确定数据库条目的键:

Currently I need to push a large CSV file into a mongo DB and the order of the values needs to determine the key for the DB entry:

示例 CSV 文件:

9,1557,358,286,Mutantville,4368,2358026,,M,0,0,0,1,0
9,1557,359,147,Wroogny,4853,2356061,,D,0,0,0,1,0

将其解析为数组的代码:

Code to parse it into arrays:

var fs = require("fs");

var csv = require("fast-csv");

fs.createReadStream("rank.txt")
    .pipe(csv())
    .on("data", function(data){
        console.log(data);
    })
    .on("end", function(data){
        console.log("Read Finished");
    });

代码输出:

[ '9',
  '1557',
  '358',
  '286',
  'Mutantville',
  '4368',
  '2358026',
  '',
  'M',
  '0',
  '0',
  '0',
  '1',
  '0' ]
[ '9',
  '1557',
  '359',
  '147',
  'Wroogny',
  '4853',
  '2356061',
  '',
  'D',
  '0',
  '0',
  '0',
  '1',
  '0' ]

如何将数组插入到我的 mongoose 模式中以进入 mongo db?

How do I insert the arrays into my mongoose schema to go into mongo db?

架构:

var mongoose = require("mongoose");


var rankSchema = new mongoose.Schema({
   serverid: Number,
   resetid: Number,
   rank: Number,
   number: Number,
   name: String,
   land: Number,
   networth: Number,
   tag: String,
   gov: String,
   gdi: Number,
   protection: Number,
   vacation: Number,
   alive: Number,
   deleted: Number
});

module.exports = mongoose.model("Rank", rankSchema);

数组的顺序需要与模式的顺序相匹配,例如在数组中,第一个数字 9 需要始终保存为它们的键serverid"等等.我正在使用 Node.JS

The order of the array needs to match the order of the schema for instance in the array the first number 9 needs to always be saved as they key "serverid" and so forth. I'm using Node.JS

推荐答案

您可以通过获取 headers 来自模式定义,它将作为对象"返回解析的行.你实际上有一些不匹配,所以我用更正标记了它们:

You can do it with fast-csv by getting the headers from the schema definition which will return the parsed lines as "objects". You actually have some mismatches, so I've marked them with corrections:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

只要架构实际上与提供的 CSV 一致,就可以了.这些是我可以看到的更正,但如果您需要以不同方式对齐的实际字段名称,则需要进行调整.但是在有 String 的位置基本上有一个 Number 并且本质上是一个额外的字段,我假设它是 CSV 中的空白字段.

As long as the schema actually lines up to the provided CSV then it's okay. These are the corrections that I can see but if you need the actual field names aligned differently then you need to adjust. But there was basically a Number in the position where there is a String and essentially an extra field, which I'm presuming is the blank one in the CSV.

一般的事情是从架构中获取字段名称的数组并将其传递到创建 csv 解析器实例的选项中:

The general things are getting the array of field names from the schema and passing that into the options when making the csv parser instance:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

一旦你真的这样做了,你就会得到一个对象"而不是一个数组:

Once you actually do that then you get an "Object" back instead of an array:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

不要担心类型",因为 Mongoose 会根据架构转换值.

Don't worry about the "types" because Mongoose will cast the values according to schema.

其余的发生在 data 事件的处理程序中.为了获得最大效率,我们使用 insertMany() 仅每 10,000 行写入一次数据库.实际如何进入服务器和进程取决于 MongoDB 版本,但根据您为单个集合导入的平均字段数,就内存使用和编写权衡"而言,10,000 应该是相当合理的合理的网络请求.如有必要,请减小数字.

The rest happens within the handler for the data event. For maximum efficiency we are using insertMany() to only write to the database once every 10,000 lines. How that actually goes to the server and processes depends on the MongoDB version, but 10,000 should be pretty reasonable based on the average number of fields you would import for a single collection in terms of the "trade-off" for memory usage and writing a reasonable network request. Make the number smaller if necessary.

重要的部分是将这些调用标记为 async 函数和 await 的结果 insertMany() 在继续之前.我们还需要 pause() 流和 <每个项目上的 href="https://nodejs.org/api/stream.html#stream_readable_resume" rel="nofollow noreferrer">resume() 否则我们将面临以下风险在实际发送之前覆盖要插入的文档的 buffer.pause()resume() 必须在管道上放置背压",否则项目只是保持出来"并触发 data 事件.

The important parts are to mark these calls as async functions and await the result of the insertMany() before continuing. Also we need to pause() the stream and resume() on each item otherwise we run the risk of overwriting the buffer of documents to insert before they are actually sent. The pause() and resume() are necessary to put "back-pressure" on the pipe, otherwise items just keep "coming out" and firing the data event.

当然,控制 10,000 个条目需要我们在每次迭代和流完成时检查,以便清空缓冲区并将任何剩余的文档发送到服务器.

Naturally the control for the 10,000 entries requires we check that both on each iteration and on stream completion in order to empty the buffer and send any remaining documents to the server.

这确实是您想要做的,因为您当然不想在通过 data 事件的每次"迭代中向服务器发出异步请求,或者基本上不需要等待每个要求完成.您不会检查非常小的文件",但对于任何现实世界的负载,由于尚未完成的飞行中"异步调用,您肯定会超出调用堆栈.

That's really what you want to do, as you certainly don't want to fire off an async request to the server both on "every" iteration through the data event or essentially without waiting for each request to complete. You'll get away with not checking that for "very small files", but for any real world load you're certain to exceed the call stack due to "in flight" async calls which have not yet completed.

仅供参考 - 使用了 package.json.mz 是可选的,因为它只是一个现代化的 Promise 启用了我只是习惯使用的标准节点内置"库的库.代码当然可以与 fs 模块完全互换.

FYI - a package.json used. The mz is optional as it's just a modernized Promise enabled library of standard node "built-in" libraries that I'm simply used to using. The code is of course completely interchangeable with the fs module.

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

<小时>

实际上,在 Node v8.9.x 及更高版本中,我们甚至可以通过 流到迭代器 模块.它仍然处于 Iterator> 模式,但它应该在 Node v10.x 变得稳定 LTS 之前这样做:


Actually with Node v8.9.x and above then we can even make this much simpler with an implementation of AsyncIterator through the stream-to-iterator module. It's still in Iterator<Promise<T>> mode, but it should do until Node v10.x becomes stable LTS:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

基本上,所有的流事件"处理、暂停和恢复都被一个简单的 for 循环取代:

Basically, all of the stream "event" handling and pausing and resuming gets replaced by a simple for loop:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

简单!当它变得更稳定时,这会在以后的节点实现中使用 for..await..of 清除.但以上在指定版本及更高版本上运行良好.

Easy! This gets cleaned up in later node implementation with for..await..of when it becomes more stable. But the above runs fine on the from the specified version and above.

这篇关于使用 Mongoose Schema 导入 CSV的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆