避免在 Confluent Elasticsearch Connector 中为空值覆盖字段 [英] Avoid overwriting fields when for null values in Confluent Elasticsearch Connector

查看:25
本文介绍了避免在 Confluent Elasticsearch Connector 中为空值覆盖字段的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个扩充管道,可以更新动态数量的字段,写入 Kafka,然后发送到 Elasticsearch.我们正在使用 Confluent Elasticsearch Connector.

I have an enrichment pipeline that updates a dynamic number of fields, writes to Kafka, and then sends to Elasticsearch. We are using the Confluent Elasticsearch Connector.

例如,如果发送到 ES 连接器的第一条记录是这样的:

E.g., if first record sent to the ES Connector is like:

{id: 1, name: "Bob", age: null}

丰富的记录是这样的:

{id: 1, name: null, age: 34}

我希望在 Elasticsearch 中有一个结果记录:

I want to have a resulting record in Elasticsearch to be:

{id: 1, name: "Bob", age: 34}

丰富的记录必须有一个空值(即,在我们上面的例子中 name: null)而不是根本不设置键的原因是它来自 Avro 数据并且我们的架构列出了几个可选的字段.由于扩充管道正在更新动态数量的字段,这似乎是最直接的解决方案(即,可能会更新一个记录中的 name 字段,但 age场在另一个).由于可选的 Avro 字段默认为 null,这就是我们的 null 值的来源.

The reason that the enriched record has to have a null value (i.e., in our example above name: null) instead of just not setting the key at all is that it's coming from Avro data and our schema lists several fields as optional. Since the enrichment pipeline is updating a dynamic number of fields, this seems to be the most straightforward solution (i.e., might be updating the name field in one record, but the age field in another). Since optional Avro fields are defaulting to null, this is where our null values are coming from.

我尝试了 write.method=upsert 设置 如本文所示,但这似乎仍会覆盖所有将 null 作为丰富记录值的字段.IE.根据上面的示例,ES 中的结果记录看起来像 {id: 1, name: null, age: 34}.上面链接的帖子似乎通过为单个记录类型使用多个 Avro 模式解决了这个问题,这对我们不起作用,因为它增加了太多的复杂性.

I tried the write.method=upsert setting as shown in this post, but this seems to still overwrite all fields that have null as the value of the enriched record. I.e. per the example above, the resulting record in ES looks like {id: 1, name: null, age: 34}. The post linked above seems to have solved this by having multiple Avro schemas for a single record type, which doesn't work for us since it adds too much complexity.

我注意到 ES 连接器也有 behavior.on.null.values 的设置,但我的理解是,这是针对整个记录为 null 而不是单个记录的情况字段.

I noticed that ES Connector also has a setting for behavior.on.null.values but my understanding is that this is for when entire records are null, not individual fields.

Confluent ES Sink Connector 中是否有类似于 Datastax C* 连接器中的nullToUnset?

Is there a setting in Confluent ES Sink Connector that is something like nullToUnset in the Datastax C* Connector?

如果没有,有什么好的方法可以实现吗?

If not, is there a good way to implement this?

推荐答案

相关代码行在这里:https://github.com/confluentinc/kafka-connect-elasticsearch/blob/master/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java#L170

这基本上意味着源文档按原样发送到索引 - 没有修改.

This basically means that the source document is sent as is to the index - no modifications.

您最好的选择可能是添加一个 SMT 来读取源文档并删除任何具有空值的字段.

Your best option is probably to add an SMT that reads the source document and removes any fields with null values.

这篇关于避免在 Confluent Elasticsearch Connector 中为空值覆盖字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆