聚合和更新 MongoDB [英] Aggregate and update MongoDB

查看:36
本文介绍了聚合和更新 MongoDB的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 2 个集合:

  • 客户(6 000 000 份文件)
  • 订单(50 000 000 个文档)

每天一次,我想按客户计算过去一年、过去一个月和过去一周的订单数量.

Once a day, i would like to calculate the number of orders in the past year, past month and past week, and such, by client.

我试过了:

db.orders.aggregate(
    {$match: 
        { date_order: { $gt: v_date1year } }
    },
    {$group : {
        _id : "$id_client", 
        count : {$sum : 1}
    }} ,
    {
        "$out": "tmp_indicators"
    }
)

db.tmp_indicators.find({}).forEach(function (my_client) { 
    db.clients.update (
        {"id_client": my_client._id},
        {"$set": 
            { "nb_orders_1year" : my_client.count }
        }
    )
})

我必须这样做 3 次,1 次用于过去一年的汇总,1 次用于过去一个月,1 次用于过去一周.治疗很慢,有没有更好的办法?

I have to do this 3 times, 1 for the past year aggregation, 1 for the past month and 1 for the past week. The treatement is very slow, do you have an idea of how to perform it in a better way?

推荐答案

为了提高性能,尤其是在处理大型集合时,请利用 Bulk() API 用于批量更新,因为您将批量发送操作到服务器(例如,假设批量大小为 1000),这会给您带来更好的性能,因为您不会将每个请求都发送到服务器(就像您目前在 forEach() 循环)但每次只执行一次1000 个请求,从而使您的更新比当前更高效、更快.

For improved performance especially when dealing with large collections, take advantage of using the Bulk() API for bulk updates as you will be sending the operations to the server in batches (for example, say a batch size of 1000) which gives you much better performance since you won't be sending every request to the server (as you are currently doing with the update statement within the forEach() loop) but just once in every 1000 requests, thus making your updates more efficient and quicker than currently is.

以下示例演示了这种方法,第一个示例使用了Bulk() API 在 MongoDB 版本 >= 2.6 和 >= 2.6 中可用3.2.它通过使用聚合结果中的值更改 nb_orders_1year 字段来更新 clients 集合中的所有文档.

The following examples demonstrate this approach, the first one uses the Bulk() API available in MongoDB versions >= 2.6 and < 3.2. It updates all the documents in the clients collection by changing the nb_orders_1year fields with values from the aggregation results.

由于 aggregate() 方法返回一个 cursor, 可以使用聚合输出集合的forEach() 方法来迭代它并访问每个文档,从而设置批量更新操作,然后使用 API 高效地跨服务器发送:

Since the aggregate() method returns a cursor, You can use the aggregation output collection's forEach() method to iterate it and access each document thus setting up the bulk update operations in batches to then send across the server efficiently with the API:

var bulk = db.clients.initializeUnorderedBulkOp(),
    pipeline = [
        {
            "$match": { "date_order": { "$gt": v_date1year } }
        },
        {
            "$group": {
                "_id": "$id_client", 
                "count": { "$sum" : 1 }
            }
        },
        { "$out": "tmp_indicators" }        
    ],
    counter = 0;

db.orders.aggregate(pipeline);  
db.tmp_indicators.find().forEach(function (doc) {       
    bulk.find({ "_id": doc._id }).updateOne({ 
        "$set": { "nb_orders_1year": doc.count }
    });

    counter++;
    if (counter % 1000 == 0) {
        bulk.execute(); // Execute per 1000 operations and re-initialize every 1000 update statements
        bulk = db.clients.initializeUnorderedBulkOp();
    }
});
// Clean up remaining operations in queue
if (counter % 1000 != 0) { bulk.execute(); }

下一个示例适用于新的 MongoDB 版本 3.2弃用了 Bulk API 并使用 bulkWrite().

The next example applies to the new MongoDB version 3.2 which has since deprecated the Bulk API and provided a newer set of apis using bulkWrite().

它使用与上面相同的游标,但不是迭代结果,而是使用其map() 方法:

It uses the same cursor as above but instead of iterating the result, create the array with the bulk operations by using its map() method:

 var pipeline = [
        {
            "$match": { "date_order": { "$gt": v_date1year } }
        },
        {
            "$group": {
                "_id": "$id_client", 
                "count": { "$sum" : 1 }
            }
        },
        { "$out": "tmp_indicators" }        
    ];
db.orders.aggregate(pipeline);
var bulkOps = db.tmp_indicators.find().map(function (doc) { 
        return { 
            "updateOne": { 
                "filter": { "_id": doc._id } ,              
                "update": { "$set": { "nb_orders_1year": doc.count } } 
            }         
        };
    });

db.clients.bulkWrite(bulkOps, { "ordered": true });

这篇关于聚合和更新 MongoDB的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆