kafka 连接器 debezium mongodb CDC 更新/$set 消息没有过滤器(_id 值) [英] kafka connector debezium mongodb CDC update/$set message without filter(_id value)

查看:26
本文介绍了kafka 连接器 debezium mongodb CDC 更新/$set 消息没有过滤器(_id 值)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 debezium mongodb 连接器设置从 mongodb 到 kudu 的同步.但正如 debezium doc 所说,而且我自己尝试过并发现,debezium mongodb CDC 更新/$set 消息没有过滤器(_id 值).

i am trying to setup syncing from mongodb to kudu with debezium mongodb connector. but as debezium doc tell and also i tried by myself and found, there are no filter(_id value) for debezium mongodb CDC update/$set message.

{
    "after": null,
    "patch": "{\"$v\" : 1,\"$set\" : {\"_upts_ratio_average_points\" : {\"$numberLong\" : \"1564645156749\"},\"updatets\" : {\"$numberLong\" : \"1564645156749\"}}}",
    "source": {
        "version": "0.9.5.Final",
        "connector": "mongodb",
        "name": "promongodbdeb05",
        "rs": "mgset-13056897",
        "ns": "strtest.mg_jsd_result_all",
        "sec": 1564645156,
        "ord": 855,
        "h": -1570214265415439167,
        "initsync": false
    },
    "op": "u",
    "ts_ms": 1564648181536
}

我不明白为什么要这样设计,没有过滤器真的不知道更新了哪个文档.我下载了这个连接器的源代码并尝试修复它.看起来 class io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope 是使用这些代码提取 MongoDB op 日志消息的地方.这个文件混淆了 _idid 操作,看起来连接器的提交者确实试图在 CDC 中包含 _id 值更新消息.我试图将 valueDocument.append("id", keyDocument.get("id")); 更改为 valueDocument.append("id", keyDocument.get("_id")); 重建和部署连接器后,CDC 消息中仍然没有 _id 值.

I don't understand why designed like this, without filter really no idea which document is updated. I downloaded the source code of this connector and try to fix it. It looks like class io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope is where MongoDB op log message is extracted with code like these. And this file is with confusing both _id and id manipulations and it looks like the committer of the connector indeed tried to include _id value in the CDC update message. I tried to change valueDocument.append("id", keyDocument.get("id")); to valueDocument.append("id", keyDocument.get("_id")); still no _id value in CDC message after the connector is rebuilt and deployed.

任何熟悉 debezium 的人可以帮助我吗?

Anyone familiar with debezium can help me with this?

{
    private BsonDocument getUpdateDocument(R patchRecord, BsonDocument keyDocument) {
        BsonDocument valueDocument = new BsonDocument();
        BsonDocument document = BsonDocument.parse(patchRecord.value().toString());

        if (document.containsKey("$set")) {
            valueDocument = document.getDocument("$set");
        }

        if (document.containsKey("$unset")) {
            Set<Entry<String, BsonValue>> unsetDocumentEntry = document.getDocument("$unset").entrySet();

            for (Entry<String, BsonValue> valueEntry : unsetDocumentEntry) {
                // In case unset of a key is false we don't have to do anything with it,
                // if it's true we want to set the value to null
                if (!valueEntry.getValue().asBoolean().getValue()) {
                    continue;
                }
                valueDocument.append(valueEntry.getKey(), new BsonNull());
            }
        }

        if (!document.containsKey("$set") && !document.containsKey("$unset")) {
            if (!document.containsKey("_id")) {
                throw new ConnectException("Unable to process Mongo Operation, a '$set' or '$unset' is necessary " +
                        "for partial updates or '_id' is expected for full Document replaces.");
            }
            // In case of a full update we can use the whole Document as it is
            // see https://docs.mongodb.com/manual/reference/method/db.collection.update/#replace-a-document-entirely
            valueDocument = document;
            valueDocument.remove("_id");
        }

        if (!valueDocument.containsKey("id")) {
            valueDocument.append("id", keyDocument.get("id"));
        }

        if (flattenStruct) {
            final BsonDocument newDocument = new BsonDocument();
            valueDocument.forEach((fKey, fValue) -> newDocument.put(fKey.replace(".", delimiter), fValue));
            valueDocument = newDocument;
        }

        return valueDocument;
    }

}

@jiri,非常感谢您的回复,我总是收到这样的消息:

@jiri, thanks a lot for your reply, the message i got always like this:

{ "after": null, "patch": "{\"$v\" : 1,\"$set\" : {\"_upts_ratio_average_points\" : {\"$numberLong\" : \"1564645156749\"},\"updatets\" : {\"$numberLong\" : \"1564645156749\"}}}", "source": { "version": "0.9.5.Final", "connector": "mongodb", "name": "promongodbdeb05", "rs": "mgset-13056897", "ns": "strtest.mg_jsd_result_all", "sec": 1564645156, "ord": 855, "h": -1570214265415439167, "initsync": false }, "op": "u", "ts_ms": 1564648181536 }

我搜索并发现其他人可以像这篇文章一样获得 debezium mongodb CDC:https://rmoff.net/2018/03/27/streaming-data-from-mongodb-into-kafka-with-kafka-connect-and-debezium/像这样:{"after": {\"_id\" : {\"$oid\" : \"58385328e4b001431e4e497a\"}, ....

and i searched and found someone else can get debezium mongodb CDC like this article: https://rmoff.net/2018/03/27/streaming-data-from-mongodb-into-kafka-with-kafka-connect-and-debezium/ like this: { "after": {\"_id\" : {\"$oid\" : \"58385328e4b001431e4e497a\"}, ....

可以看到我无法获取_id,所以我无法知道在哪个文档/记录上进行了更改,但是对于上面的帖子,作者似乎可以获取_id,同样通过检查代码,_id 应该在那里.我同时使用了 0.9.5Final0.7.4 rev 在上面的帖子中使用.对我来说都没有运气,总是没有 _id 值.

One can see I can't get _id, so no way for me know this change on which document/record, but as to the above post, it looks like the author can get _id, also by checking code, _id should be there. I used both 0.9.5Final and the 0.7.4 rev which used in the above post. both no luck for me, always without _id value.

推荐答案

消费话题时,需要添加--property key.print=true.它会有Key.值为在关键部分.谢谢

when consumer the topic,need to add --property key.print=true.It will have the Key.The value is in the key part.Thanks

这篇关于kafka 连接器 debezium mongodb CDC 更新/$set 消息没有过滤器(_id 值)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆