如何防止多个文档被添加到mongodb数据库 [英] How to prevent multiple documents from being added to mongodb database

查看:32
本文介绍了如何防止多个文档被添加到mongodb数据库的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有多个工人"服务器运行作业,有时不止一台服务器将运行相同的作业.我只希望一台服务器能够完成这项工作,因此我将以下索引添加到 Message 架构中以防止重复消息:

<预><代码>const 消息 = 新架构({accountID: {type: Schema.Types.ObjectId, ref: "Account", required: true},列表ID:{type:Schema.Types.ObjectId,参考:列表",需要:true},messageRuleID: {type: Schema.Types.ObjectId, ref: "MessageRule", required: true},保留ID:{type:Schema.Types.ObjectId,参考:保留",需要:true},锁定在:日期,状态:字符串},{时间戳:真实});消息索引({列表ID:1,消息规则 ID: 1,预订ID:1},{唯一:真实,名称:Message_index_0"});

然后我使用下面的代码来创建并锁定一条消息,以防止一条消息发送两次:

const messageQuery = {accountID:604f9355eeab332490184532",列表ID:604f9358be89f997345b238d",messageRuleID:607d44e75d54c700041f38e1",预订ID:605118b7694b9765f49787e1";};异步函数开始(){const time = moment().toDate();尝试 {const message = await Message.create(messageQuery);console.log(创建一条消息", message._id);} 捕捉(错误){如果(错误代码!== 11000){//忽略重复键错误抛出错误;} 别的 {控制台日志(重复消息");}}const cutoff = moment().subtract()Config.messageSendLock.amount,Config.messageSendLock.unit);const message = await Message.findOneAndUpdate({...消息查询,状态:{$nin: [已禁用",已发送"]},$或:[{lockedAt: {$exists: false}},{lockedAt:空},{lockedAt:{$lte:截止}}]},{lockedAt:moment().toDate()});//如果没有定义消息,那么它要么被发送,要么被禁用,要么被锁定如果(!消息){console.log(现有,消息要么被发送,要么被禁用,要么被锁定");返回;}等待 Message.findOneAndUpdate(messageQuery, {状态:已发送",$unset: {lockedAt: ""}//不确定是否需要});console.log(SEND MESSAGE", time, message._id);}

如果我把它放在一个 forEach 循环中并运行它一百次,那么这段代码效果很好,但是当我从多个服务器运行它时,每个服务器都有自己的数据库连接,一些服务器认为他们正在创建一个文档并且能够发送两次消息.

我已经能够通过使用 throng 重现这个问题,见下文:

人群({工人:开始,计数:50,寿命:无限});

当我运行上面的代码时,这是控制台输出:

创建了一条消息 60905845a3bd3922c477182f重复消息现有,消息被发送、禁用或被锁定发送消息 2021-05-03T20:08:37.257Z 60905845a3bd3922c477182f创建了一条消息 60905845a3caa422c6bbb74d现有,消息被发送、禁用或被锁定创建了一条消息 6090584561583c22c8752b61创建了一条消息 6090584529dcee22c733a1eb现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定创建了一条消息 6090584569c22d22c216733a重复消息现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定重复消息创建了一条消息 609058451bb43d22cc5519d5重复消息已存在,消息已发送、已禁用或已锁定发送消息 2021-05-03T20:08:37.765Z 609058451bb43d22cc5519d5重复消息重复消息现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定创建了一条消息 609058469c84e322cd30b326现有,消息被发送、禁用或被锁定创建了一条消息 60905846b2ab3e22d215792f重复消息重复消息现有,消息被发送、禁用或被锁定发送消息 2021-05-03T20:08:38.461Z 60905846b2ab3e22d215792f创建了一条消息 60905846697d5922d02fb377现有,消息被发送、禁用或被锁定发送消息 2021-05-03T20:08:38.466Z 60905846697d5922d02fb377重复消息创建消息 60905846a6c30622d6350eea创建了一条消息 609058462b0ce822d9d9dca1现有,消息被发送、禁用或被锁定发送消息 2021-05-03T20:08:38.784Z 60905846a6c30622d6350eea发送消息 2021-05-03T20:08:38.968Z 609058462b0ce822d9d9dca1创建了一条消息 60905846e61d3e22dbe1542a创建了一条消息 60905846065b3822d428f2c9创建了一条消息 6090584708f85722e0df9cd2创建了一条消息 6090584734d99922dd612ae3现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定创建了一条消息 60905847fd807822dfc23f2重复消息重复消息创建了一条消息 609058470a07ff22daafe0db重复消息重复消息重复消息重复消息重复消息现有,消息被发送、禁用或被锁定创建了一条消息 60905847abbbe522eac8be2e重复消息发送消息 2021-05-03T20:08:39.517Z 609058470a07ff22daafe0db重复消息现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定已存在,消息已发送、已禁用或已锁定已存在,消息已发送、已禁用或已锁定现有,消息被发送、禁用或被锁定创建了一条消息 60905847d5283d22f3aa6239已存在,消息已发送、已禁用或已锁定现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定发送消息 2021-05-03T20:08:39.529Z 60905847d5283d22f3aa6239现有,消息被发送、禁用或被锁定创建了一条消息 6090584717434e22e3634596重复消息重复消息重复消息重复消息现有,消息被发送、禁用或被锁定重复消息重复消息重复消息重复消息现有,消息被发送、禁用或被锁定重复消息现有,消息被发送、禁用或被锁定发送消息 2021-05-03T20:08:39.573Z 6090584717434e22e3634596重复消息现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定重复消息现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定重复消息现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定现有,消息被发送、禁用或被锁定

您可以看到它以某种方式能够绕过检查并多次发送消息.有什么办法可以防止消息发送两次吗?

更新:

连接到数据库:

mongoose.Promise = global.Promise;mongoose.connect(Config.uristring, {useNewUrlParser: 真,使用创建索引:真,useFindAndModify: 假,useUnifiedTopology: true});

数据库设置:

托管在 Mongodb Atlas

数据库版本:4.2.13

副本集 - 3 个节点

"mongodb://abc:def@ghi-00.mongodb.net:27017,ghi-01.mongodb.net:27017,ghi-02.mongodb.net:27017/jkl?ssl=true&replicaSet=ghi-00&authSource=admin";

更新 2:

我在 message.create() 代码和锁定消息的 findOneAndUpdate() 代码之间添加了一个 sleep() 函数.我让每个人睡了 2 到 7 秒的随机时间,并且似乎大部分时间都解决了这个问题.这只是一个黑客,而不是一个好的长期解决方案.

await sleep(Math.floor(Math.random() * 50) * 100 + 2000);

function sleep(ms) {返回新的承诺(解决 => {设置超时(解决,毫秒);});}

解决方案

问题出在猫鼬与人群的兼容性上.

它会触发新的connect"每个线程"的事件然后删除连接 https://gist.github.com 上的文档/wootwoot1234/49cb7d082850d93f8cd03da164644cfb#file-index-js-L100:

db.once('open', async function() {等待 Message.deleteOne(messageQuery);//<=== 从这里删除它以解决问题人群({工人:开始,计数:50,寿命:无限});});

一定是因为throng在线程之间隔离全局变量的方式,特别是连接池.默认情况下 mongoose 连接池是 5,这意味着一次不超过 5 个连接到数据库.驱动程序使它们保持打开状态并重复使用以节省连接成本.

如果您检查 Atlas 连接监控,您会在运行此脚本时看到一些额外的 100 个连接.

I have multiple "worker" servers running jobs and sometimes more than one server will run the same job. I only want one server to be able to complete the job so I added the following index to a Message schema to prevent duplicate messages:


const Message = new Schema(
    {
        accountID: {type: Schema.Types.ObjectId, ref: "Account", required: true},
        listingID: {type: Schema.Types.ObjectId, ref: "Listing", required: true},
        messageRuleID: {type: Schema.Types.ObjectId, ref: "MessageRule", required: true},
        reservationID: {type: Schema.Types.ObjectId, ref: "Reservation", required: true},
        lockedAt: Date,
        status: String
    },
    {timestamps: true}
);

Message.index(
    {
        listingID: 1,
        messageRuleID: 1,
        reservationID: 1
    },
    {unique: true, name: "Message_index_0"}
);

Then I use the following code to create and lock a message to prevent a message from sending twice:

const messageQuery = {
    accountID: "604f9355eeab332490184532",
    listingID: "604f9358be89f997345b238d",
    messageRuleID: "607d44e75d54c700041f38e1",
    reservationID: "605118b7694b9765f49787e1"
};

async function start() {
    const time = moment().toDate();
    try {
        const message = await Message.create(messageQuery);
        console.log("CREATED A MESSAGE", message._id);
    } catch (error) {
        if (error.code !== 11000) {
            // ignore duplicate key error
            throw error;
        } else {
            console.log("DUPLICATE MESSAGE");
        }
    }

    const cutoff = moment().subtract(
        Config.messageSendLock.amount,
        Config.messageSendLock.unit
    );
    const message = await Message.findOneAndUpdate(
        {
            ...messageQuery,
            status: {$nin: ["disabled", "sent"]},
            $or: [
                {lockedAt: {$exists: false}},
                {lockedAt: null},
                {lockedAt: {$lte: cutoff}}
            ]
        },
        {lockedAt: moment().toDate()}
    );

    // If no message is defined, then it's either sent, disabled, or is locked
    if (!message) {
        console.log("Existing, message is either sent, disabled, or is locked");
        return;
    }
    await Message.findOneAndUpdate(messageQuery, {
        status: "sent",
        $unset: {lockedAt: ""} // Not sure this is needed
    });
    console.log("SEND MESSAGE", time, message._id);
}

This code works great if I put it in a forEach loop and run it a hundred times but when I run it from multiple servers, each with its own connection to the database, some of the servers think they are creating a document and are able to send the message twice.

I've been able to reproduce this issue by using throng, see below:

throng({
    worker: start,
    count: 50,
    lifetime: Infinity
});

When I run the above code this is the console output:

CREATED A MESSAGE 60905845a3bd3922c477182f
DUPLICATE MESSAGE
Existing, message is either sent, disabled, or is locked
SEND MESSAGE 2021-05-03T20:08:37.257Z 60905845a3bd3922c477182f
CREATED A MESSAGE 60905845a3caa422c6bbb74d
Existing, message is either sent, disabled, or is locked
CREATED A MESSAGE 6090584561583c22c8752b61
CREATED A MESSAGE 6090584529dcee22c733a1eb
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
CREATED A MESSAGE 6090584569c22d22c216733a
DUPLICATE MESSAGE
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
DUPLICATE MESSAGE
CREATED A MESSAGE 609058451bb43d22cc5519d5
DUPLICATE MESSAGE
Existing, message is either sent, disabled, or is locked
SEND MESSAGE 2021-05-03T20:08:37.765Z 609058451bb43d22cc5519d5
DUPLICATE MESSAGE
DUPLICATE MESSAGE
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
CREATED A MESSAGE 609058469c84e322cd30b326
Existing, message is either sent, disabled, or is locked
CREATED A MESSAGE 60905846b2ab3e22d215792f
DUPLICATE MESSAGE
DUPLICATE MESSAGE
Existing, message is either sent, disabled, or is locked
SEND MESSAGE 2021-05-03T20:08:38.461Z 60905846b2ab3e22d215792f
CREATED A MESSAGE 60905846697d5922d02fb377
Existing, message is either sent, disabled, or is locked
SEND MESSAGE 2021-05-03T20:08:38.466Z 60905846697d5922d02fb377
DUPLICATE MESSAGE
CREATED A MESSAGE 60905846a6c30622d6350eea
CREATED A MESSAGE 609058462b0ce822d9d9dca1
Existing, message is either sent, disabled, or is locked
SEND MESSAGE 2021-05-03T20:08:38.784Z 60905846a6c30622d6350eea
SEND MESSAGE 2021-05-03T20:08:38.968Z 609058462b0ce822d9d9dca1
CREATED A MESSAGE 60905846e61d3e22dbe1542a
CREATED A MESSAGE 60905846065b3822d428f2c9
CREATED A MESSAGE 6090584708f85722e0df9cd2
CREATED A MESSAGE 6090584734d99922dd612ae3
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
CREATED A MESSAGE 60905847fd807822dffc23f2
DUPLICATE MESSAGE
DUPLICATE MESSAGE
CREATED A MESSAGE 609058470a07ff22daafe0db
DUPLICATE MESSAGE
DUPLICATE MESSAGE
DUPLICATE MESSAGE
DUPLICATE MESSAGE
DUPLICATE MESSAGE
Existing, message is either sent, disabled, or is locked
CREATED A MESSAGE 60905847abbbe522eac8be2e
DUPLICATE MESSAGE
SEND MESSAGE 2021-05-03T20:08:39.517Z 609058470a07ff22daafe0db
DUPLICATE MESSAGE
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
CREATED A MESSAGE 60905847d5283d22f3aa6239
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
SEND MESSAGE 2021-05-03T20:08:39.529Z 60905847d5283d22f3aa6239
Existing, message is either sent, disabled, or is locked
CREATED A MESSAGE 6090584717434e22e3634596
DUPLICATE MESSAGE
DUPLICATE MESSAGE
DUPLICATE MESSAGE
DUPLICATE MESSAGE
Existing, message is either sent, disabled, or is locked
DUPLICATE MESSAGE
DUPLICATE MESSAGE
DUPLICATE MESSAGE
DUPLICATE MESSAGE
Existing, message is either sent, disabled, or is locked
DUPLICATE MESSAGE
Existing, message is either sent, disabled, or is locked
SEND MESSAGE 2021-05-03T20:08:39.573Z 6090584717434e22e3634596
DUPLICATE MESSAGE
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
DUPLICATE MESSAGE
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
DUPLICATE MESSAGE
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked
Existing, message is either sent, disabled, or is locked

You can see it's somehow able to bypass the checks and send the message multiple times. Is there any way to prevent the message from sending twice?

Update:

Connection to db:

mongoose.Promise = global.Promise;
mongoose.connect(Config.uristring, {
    useNewUrlParser: true,
    useCreateIndex: true,
    useFindAndModify: false,
    useUnifiedTopology: true
});

Database settings:

Hosted on Mongodb Atlas

Db Version: 4.2.13

Replica Set - 3 nodes

"mongodb://abc:def@ghi-00.mongodb.net:27017,ghi-01.mongodb.net:27017,ghi-02.mongodb.net:27017/jkl?ssl=true&replicaSet=ghi-00&authSource=admin";

Update 2:

I added a sleep() function between the message.create() code and the findOneAndUpdate() code that locks the message. I made each one sleep for a random amount of time from 2 to 7 seconds and seemed to fix the issue most of the time. It's just a hack and not a good long-term solution.

await sleep(Math.floor(Math.random() * 50) * 100 + 2000);

function sleep(ms) {
    return new Promise(resolve => {
        setTimeout(resolve, ms);
    });
}

解决方案

The problem lies somewhere in mongoose compatibility with throng.

It fires new "connect" event for each "thread" and you delete the document on connect https://gist.github.com/wootwoot1234/49cb7d082850d93f8cd03da164644cfb#file-index-js-L100:

db.once('open', async function() {
  await Message.deleteOne(messageQuery); // <=== remove it from here to fix the problem
  throng({
    worker: start,
    count: 50,
    lifetime: Infinity
  });
});

It must be because of the way throng isolates global variables between threads, connection pool in particular. By default mongoose connection pool is 5, which means there are no more than 5 connections to the database at a time. The driver keeps them open and reuse to save on connection cost.

If you check Atlas connections monitoring you will see some extra 100 connections when you run this script.

这篇关于如何防止多个文档被添加到mongodb数据库的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆