如何从MongoDB中的ChangeStream过滤对特定字段的更新 [英] How do you filter updates to specific fields from ChangeStream in MongoDB

查看:78
本文介绍了如何从MongoDB中的ChangeStream过滤对特定字段的更新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在设置一个ChangeStream,以在集合中的文档发生更改时通知我,以便我可以将该文档的"LastModified"元素向上插入事件发生的时间.由于此更新将导致ChangeStream上发生新事件,因此我需要过滤掉这些更新以防止无限循环(更新LastModified元素,因为LastModified元素刚刚被更新...).

I am setting up a ChangeStream to notify me when a document has changed in a collection so that I can upsert the "LastModified" element for that document to the time of the event. Since this update will cause a new event to occur on the ChangeStream, I need to filter out these updates to prevent an infinite loop (updating the LastModified element because the LastModified element was just updated...).

当我指定确切的字段时,我有下面的代码可以正常工作:

I have the following code that is working when I specify the exact field:

ChangeStreamOptions options = new ChangeStreamOptions();
options.ResumeAfter = resumeToken;

string filter = "{ $and: [ { operationType: { $in: ['replace','insert','update'] } }, { 'updateDescription.updatedFields.LastModified': { $exists: false } } ] }";
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match(filter);

var cursor = collection.Watch(pipeline, options, cancelToken);

但是,我不想提供硬编码"updateDescription.updatedFields.LastModified"的方法,而是要提供我不想在updatedFields文档中存在的元素名称的列表.

However, instead of hard-coding the "updateDescription.updatedFields.LastModified", I would like to provide a list of element names that I don't want to exist in the updatedFields document.

我尝试过:

string filter = "{ $and: [ { operationType: { $in: ['replace','insert','update'] } }, { 'updateDescription.updatedFields': { $nin: [ 'LastModified' ] } } ] }";

但是这并没有按预期进行(我仍然得到LastModified更改的更新事件.

but this didn't work as expected (I still got the update events for the LastModified change.

我最初使用的是过滤器生成器:

I originally was using the Filter Builder:

FilterDefinitionBuilder<ChangeStreamDocument<BsonDocument>> filterBuilder = Builders<ChangeStreamDocument<BsonDocument>>.Filter;
FilterDefinition<ChangeStreamDocument<BsonDocument>> filter = filterBuilder.In("operationType", new string[] { "replace", "insert", "update" });  //Only include the change if it was one of these types.  Available types are: insert, update, replace, delete, invalidate
filter &= filterBuilder.Nin("updateDescription.updatedFields", ChangedFieldsToIgnore); //If this is an update, only include it if the field(s) updated contains 1+ fields not in the ChangedFieldsToIgnore list

其中ChangedFieldsToIgnore是一个包含我不想为其获取事件的字段名称的列表.

where ChangedFieldsToIgnore is a List containing the field names that I do not want to get events for.

任何人都可以提供我需要使用的语法的帮助吗?还是我需要在ChangedFieldsToIgnore列表周围创建循环,并在过滤器中为"$ exists:false"的每个项目创建一个新条目? (这似乎不是很有效).

Can anyone help with the syntax that I need to use? or do I need to create a loop around my ChangedFieldsToIgnore list and create a new entry in the filter for each item to "$exists: false"? (this doesn't seem very efficient).

我根据@ wan-bachtiar的回答尝试了以下代码,但在我的enumerator.MoveNext()调用中遇到了异常:

I attempted the following code based on the answer by @wan-bachtiar, but I'm getting an exception on my enumerator.MoveNext() call:

var match1 = new BsonDocument { { "$match", new BsonDocument { { "operationType", new BsonDocument { { "$in", new BsonArray(new string[] { "replace", "insert", "update" }) } } } } } };
var match2 = new BsonDocument { { "$addFields", new BsonDocument { { "tmpfields", new BsonDocument { { "$objectToArray", "$updateDescription.updatedFields" } } } } } };
var match3 = new BsonDocument { { "$match", new BsonDocument { { "tmpfields.k", new BsonDocument { { "$nin", new BsonArray(updatedFieldsToIgnore) } } } } } };
var pipeline = new[] { match1, match2, match3 };

var cursor = collection.Watch<ChangeStreamDocument<BsonDocument>>(pipeline, options, Profile.CancellationToken);
enumerator = cursor.ToEnumerable().GetEnumerator();

enumerator.MoveNext();
ChangeStreamDocument<BsonDocument> doc = enumerator.Current;

例外是:"{"Invalid field name: \"tmpfields\"."}"

我怀疑问题可能是我正在获取不包含updateDescription字段的替换"和插入"事件,因此$ addFields/$ objectToArray失败了.我太新了,无法弄清楚语法,但是我认为我需要使用一个过滤器来做到这一点:

I suspect the problem might be that I'm getting "replace" and "insert" events which do not contain the updateDescription field, so the $addFields/$objectToArray are failing. I'm too new to figure out the syntax, but I think I need to use a filter that does:

{ $match: { "operationType": { $in: ["replace", "insert"] } } }
OR
{ $eq: { "operationTYpe": "update" }} AND { $addFields....}

此外,C#驱动程序似乎不包含有助于$ addFields和$ objectToArray操作的Builder.我只能使用new BsonDocument {...}方法来构建管道变量.

Also, it appears that the C# driver does not include a Builder that helps with the $addFields and $objectToArray operations. I was only able to use the new BsonDocument {...} method to build the pipeline variable.

推荐答案

ChangedFieldsToIgnore是一个包含我不想为其获取事件的字段名称的列表.

ChangedFieldsToIgnore is a List containing the field names that I do not want to get events for.

如果要基于多个键(updatedFields是否包含某些字段)进行过滤,则先将键转换为值会更容易.

If you would like to filter based on multiple keys (whether updatedFields contains certain fields), it's easier if you convert the keys to values first.

您可以通过使用聚合运算符updatedFields中包含的文档转换为值> $ objectToArray .例如:

You can convert the document contained within updatedFields into values by utilising aggregation operator $objectToArray. For example:

pipeline = [{"$addFields": {
             "tmpfields":{
               "$objectToArray":"$updateDescription.updatedFields"}
            }}, 
            {"$match":{"tmpfields.k":{
                       "$nin":["LastModified", "AnotherUnwantedField"]}}}
];

上述聚合管道添加了一个名为tmpfields的临时字段.此新字段将枢转updateDescription.updatedFields的内容,将{name:value}转换为[{k:name, v:value}].一旦将这些键作为值,我们就可以利用$nin作为过滤器数组.

The above aggregation pipeline adds a temporary field called tmpfields. This new field will pivot the content of updateDescription.updatedFields turning {name:value} into [{k:name, v:value}]. Once we have those keys as values, we can utilise $nin as an array of filter.

已更新

之所以得到tmpfields无效的例外,是因为结果被强制转换为

The reason you're getting an exception of tmpfields being invalid, is because the result is casted into ChangeStreamDocument model which does not have a recognizable field called tmpfields.

在这种情况下,当不同的操作没有字段updateDescription.updatedFields时,tmpfields的值将仅为<​​c13>.

In the case, when it's different operations that does not have field updateDescription.updatedFields, the value of tmpfields would just be null.

下面是使用 MongoDB .Net驱动程序的MongoDB ChangeStream .Net/C#的示例v2.5 ,以及用于修改输出更改流的聚合管道.

Below is an example of MongoDB ChangeStream .Net/C# using MongoDB .Net driver v2.5, along with an aggregation pipeline that modifies the output change stream.

此示例的类型不安全,将返回BsonDocument:

This example is not type safe, and would return BsonDocument :

var database = client.GetDatabase("database");            
var collection = database.GetCollection<BsonDocument>("collection");

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };

// Aggregation Pipeline
var addFields = new BsonDocument { 
                    { "$addFields", new BsonDocument { 
                       { "tmpfields", new BsonDocument { 
                         { "$objectToArray", 
                           "$updateDescription.updatedFields" } 
                       } } 
                 } } };
var match = new BsonDocument { 
                { "$match", new BsonDocument { 
                  { "tmpfields.k", new BsonDocument { 
                    { "$nin", new BsonArray{"LastModified", "Unwanted"} } 
            } } } } };

var pipeline = new[] { addFields, match };

// ChangeStreams
var cursor = collection.Watch<BsonDocument>(pipeline, options);

foreach (var change in cursor.ToEnumerable())
{
    Console.WriteLine(change.ToJson());
}

这篇关于如何从MongoDB中的ChangeStream过滤对特定字段的更新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆