在node.js断开连接期间如何缓冲MongoDB插入? [英] How to buffer MongoDB inserts during disconnect in node.js?

查看:220
本文介绍了在node.js断开连接期间如何缓冲MongoDB插入?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们确实读取了一个包含约500k元素的XML文件(使用 xml-stream ),并且像这样将它们插入MongoDB:

We do read an XML file (using xml-stream) with about 500k elements and do insert them into MongoDB like this:

xml.on(`endElement: product`, writeDataToDb.bind(this, "product"));

writeDataToDb(type, obj)中插入如下所示:

collection.insertOne(obj, {w: 1, wtimeout: 15000}).catch((e) => { });

现在,当Mongo连接断开连接时,xml流仍会读取,并且控制台将充满错误消息(无法插入,断开连接,EPIPE断开等).

Now when the Mongo connection gets disconnected, the xml stream still reads and the console gets flooded with error messages (can't insert, disconnected, EPIPE broken, ...).

文档中,它说:

当您关闭mongod进程时,由于bufferMaxEntries默认为-1,因此驱动程序将停止处理操作并继续对其进行缓冲,这意味着对所有操作进行缓冲.

When you shut down the mongod process, the driver stops processing operations and keeps buffering them due to bufferMaxEntries being -1 by default meaning buffer all operations.

此缓冲区的实际作用是什么?

我们注意到,当我们插入数据并关闭mongo服务器时,事情得到缓冲,然后将mongo服务器恢复,本机驱动程序成功重新连接,节点恢复插入数据,但是缓冲的文档(在mongo脱机期间)可以不会再次插入.

We notice when we insert data and close the mongo server, the things get buffered, then we bring the mongo server back up, the native driver successfully reconnects and node resumes inserting data but the buffered documents (during mongo beeing offline) do not get inserted again.

所以我质疑这个缓冲区及其用途.

So I question this buffer and its use.

目标:

我们正在寻找将插入内容保留在缓冲区中的最佳方法,直到mongo回来(根据wtimeout在15000毫秒之内),然后再插入缓冲的文档,或者使用我们尝试不使用的xml.pause();xml.resume()成功.

We are looking for the best way to keep inserts in buffer until mongo comes back (in 15000milliseconds according to wtimeout) and let then insert the buffered documents or make use of xml.pause(); and xml.resume() which we tried without success.

基本上,我们在如何处理断开连接而又不会丢失数据或中断的情况下需要一些帮助.

Basically we need a little help in how to handle disconnects without data loss or interrupts.

推荐答案

使用insertOne()插入500K元素是一个非常糟糕的主意.您应该改用批量操作,在单个请求中插入许多文档. (例如,这里为10000,因此可以在50个单个请求中完成) 为避免出现缓冲问题,您可以手动处理它:

Inserting 500K elements with insertOne() is a very bad idea. You should instead use bulk operations that allows you to insert many document in a single request. (here for example 10000, so it can be done in 50 single requests) To avoid buffering issue, you can manually handle it:

  1. 使用bufferMaxEntries: 0禁用缓冲
  2. 设置重新连接属性:reconnectTries: 30, reconnectInterval: 1000
  3. 创建一个bulkOperation并为其提供10000个项目
  4. 暂停xml阅读器.尝试插入10000个项目.如果失败,请每隔3000ms重试一次,直到成功
  5. 如果在执行过程中批量操作被中断,您可能会遇到一些重复的ID问题,因此请忽略它们(错误代码:11000)
  1. Disable buffering with bufferMaxEntries: 0
  2. Set reconnect properties: reconnectTries: 30, reconnectInterval: 1000
  3. Create a bulkOperation and feed it with 10000 items
  4. Pause the xml reader. Try to insert the 10000 items. If it fails, retry every 3000ms until it succeed
  5. You may face some duplicate ID issues if the bulk operation is interrupted during execution, so ignore them (error code: 11000)

这是一个示例脚本:

var fs = require('fs')
var Xml = require('xml-stream')

var MongoClient = require('mongodb').MongoClient
var url = 'mongodb://localhost:27017/test'

MongoClient.connect(url, {
  reconnectTries: 30,
  reconnectInterval: 1000,
  bufferMaxEntries: 0
}, function (err, db) {
  if (err != null) {
    console.log('connect error: ' + err)
  } else {
    var collection = db.collection('product')
    var bulk = collection.initializeUnorderedBulkOp()
    var totalSize = 500001
    var size = 0

    var fileStream = fs.createReadStream('data.xml')
    var xml = new Xml(fileStream)
    xml.on('endElement: product', function (product) {
      bulk.insert(product)
      size++
      // if we have enough product, save them using bulk insert
      if (size % 10000 == 0) {
        xml.pause()
        bulk.execute(function (err, result) {
          if (err == null) {
            bulk = collection.initializeUnorderedBulkOp()
            console.log('doc ' + (size - 10000) + ' : ' + size + ' saved on first try')
            xml.resume()
          } else {
            console.log('bulk insert failed: ' + err)
            counter = 0
            var retryInsert = setInterval(function () {
              counter++
              bulk.execute(function (err, result) {
                if (err == null) {
                  clearInterval(retryInsert)
                  bulk = collection.initializeUnorderedBulkOp()
                  console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')
                  xml.resume()
                } else if (err.code === 11000) { // ignore duplicate ID error
                  clearInterval(retryInsert)
                  bulk = collection.initializeUnorderedBulkOp()
                  console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')
                  xml.resume()
                } else {
                  console.log('failed after first try: ' + counter, 'error: ' + err)
                }
              })
            }, 3000) // retry every 3000ms until success
          }
        })
      } else if (size === totalSize) {
        bulk.execute(function (err, result) {
          if (err == null) {
            db.close()
          } else {
            console.log('bulk insert failed: ' + err)
          }
        })
      }
    })
  }
})

示例日志输出:

doc 0 : 10000 saved on first try
doc 10000 : 20000 saved on first try
doc 20000 : 30000 saved on first try
[...]
bulk insert failed: MongoError: interrupted at shutdown // mongodb server shutdown
failed after first try: 1 error: MongoError: no connection available for operation and number of stored operation > 0
failed after first try: 2 error: MongoError: no connection available for operation and number of stored operation > 0
failed after first try: 3 error: MongoError: no connection available for operation and number of stored operation > 0
doc 130000 : 140000 saved after 4 tries
doc 140000 : 150000 saved on first try
[...]

这篇关于在node.js断开连接期间如何缓冲MongoDB插入?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆