在Kafka MySQL源连接器中使用什么属性来为任何架构更改注册新版本 [英] What property to use in kafka mysql source connector to register a new version for any schema change

查看:22
本文介绍了在Kafka MySQL源连接器中使用什么属性来为任何架构更改注册新版本的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是架构-注册表.属性的配置

listeners=http://10.X.X.76:8081
kafkastore.bootstrap.servers=PLAINTEXT://10.XXX:9092,PLAINTEXT://10.XXX:9092,PLAINTEXT://10.XXXX.1:9092,PLAINTEXT://1XXXX.69:9092
kafkastore.topic=_schemas
debug=false
master.eligibility=true

这是我的连接器

的配置
{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "snapshot.locking.mode": "minimal",
  "database.user": "cdc_user",
  "tasks.max": "3",
  "database.history.kafka.bootstrap.servers": "10.49.115.249:9092,10.48.130.211:9092,10.54.178.121:9092,10.53.4.69:9092",
  "database.history.kafka.topic": "history.cdc.fkw.supply.mp.seller_facility",
  "database.server.name": "cdc.fkw.supply.mp",
  "heartbeat.interval.ms": "5000",
  "database.port": "3306",
  "table.whitelist": "seller_facility.addresses, seller_facility.location, seller_facility.default_location, seller_facility.location_document_mapping",
  "database.hostname": "dog-rr.ffb-supply-ffb-supply-mp.prod.altair.fkcloud.in",
  "database.password": "6X5DpJrVzI",
  "database.history.kafka.recovery.poll.interval.ms": "5000",
  "name": "cdc.fkw.supply.mp.seller_facility.connector",
  "database.history.skip.unparseable.ddl": "true",
  "errors.tolerance": "all",
  "database.whitelist": "seller_facility",
  "snapshot.mode": "when_needed"
}
当架构中有任何更改时,如何注册新架构? 我可以添加什么属性来执行此操作,以便它只将该特定主题的新版本添加到架构注册表,并且完全兼容。

推荐答案

假设您的key/value.converter使用的是汇流列之一,例如AvroConverter,任何新的/删除的数据库列都将由连接框架自动拾取,并作为KafkaAvroSerializer过程中序列化的一部分注册到注册表。

更改数据库列类型可能会产生错误,例如,将VARCHAR更改为INT

这篇关于在Kafka MySQL源连接器中使用什么属性来为任何架构更改注册新版本的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆