MongoDB Kafka Sink连接器不处理RenameByRegex处理器 [英] MongoDB Kafka Sink Connector doesn't process the RenameByRegex processor

查看:146
本文介绍了MongoDB Kafka Sink连接器不处理RenameByRegex处理器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要监听从Kafka主题和接收器到MongoDB中的集合的事件.该消息包含一个具有id属性的嵌套对象,如上例所示.

I need to listening events from a Kafka Topic and Sink to a collection in MongoDB. The message contains an nested object with an id property, like in the example above.

{
    "testId": 1,
    "foo": "bar",
    "foos": [{ "id":"aaaaqqqq-rrrrr" }]
}

我正在尝试使用RegExp将这个嵌套的id重命名为_id

I'm trying to rename this nested id to _id with RegExp

{
        "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
        "topics": "test",
        "connection.uri": "mongodb://mongo:27017",
        "database": "test_db",
        "collection": "test",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
        "value.projection.list":"testId",
        "value.projection.type": "whitelist",
        "post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder, com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex",
        "field.renamer.regexp": "[{\"regexp\":\"\b(id)\b\", \"pattern\":\"\b(id)\b\",\"replace\":\"_id\"}]"
    }

配置/验证的结果为 500 Internal Server Error ,并显示以下消息:

And the result of a config/validate is 500 Internal Server Error, with that message:

{
    "error_code": 500,
    "message": null
}

我缺少什么或有问题吗?

I missing something or is a issue?

推荐答案

我认为您想要的只是

I think all you want is Kafka Connect Single Message Transform (SMT) and more precisely ReplaceField:

过滤或重命名Struct或Map中的字段.

Filter or rename fields within a Struct or Map.


以下内容将 id 字段名称替换为 _id :

"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "id:_id"


以您为例,在应用上述转换之前,您可能还想 Flatten foos :

"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "."

,最后将转换应用于字段重命名:

and finally apply the transformation for renaming the field:

"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "foos.id:foos._id"

这篇关于MongoDB Kafka Sink连接器不处理RenameByRegex处理器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆