从MySQL到使用JDBC Sink的Postgres的Debezium-更改transforms.route.replacement会产生SinkRecordField错误 [英] Debezium from MySQL to Postgres with JDBC Sink - change of transforms.route.replacement gives a SinkRecordField error
问题描述
我正在使用 debezium-examples >
source.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
jdbc-sink.json
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "customers",
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}
可以正常工作.但是当我在以下情况下进行了一些更改时,进行了讨论.它给了我'SinkRecordField'错误.
场景
我已从源代码更改了此属性
"transforms.route.replacement": "my-$2"
现在它在kafka中创建主题,如下所示
my-inventory
当我在jdbc-sink中指定topic = my-inventory
时,它为我提供了以下例外 [io.confluent.connect.jdbc.sink.DbStructure]
connect_1 | 2019-01-29 10:34:32,218 INFO || Unable to find fields [SinkRecordField{schema=Schema{STRING}, name='email', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='first_name', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='last_name', isPrimaryKey=false}] among column names [street, customer_id, city, state, id, type, zip] [io.confluent.connect.jdbc.sink.DbStructure]
connect_1 | 2019-01-29 10:34:32,220 ERROR || WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect_1 | org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add missing field SinkRecordField{schema=Schema{STRING}, name='email', isPrimaryKey=false}, as it is not optional and does not have a default value
connect_1 | at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:133)
注意::在Db中,创建名为"my-inventory"的表
JDBC接收器希望每个主题一个表,每个主题也具有一个单一的架构(列名x类型).
您在Debezium/源端进行的正则表达式路由实际上是在inventory
中转储任何表(可能包括一些系统表,尽管我不记得它是配置中的默认值)数据库中找到我的库存"主题.
因此,一旦在该主题中捕获了多个表,您可能会遇到麻烦...
I am using this debezium-examples
source.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
jdbc-sink.json
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "customers",
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}
its working fine.But when I have made some changes as discuss in the following scenario. it giving me 'SinkRecordField' error.
Scenario
I have changed this properties from source
"transforms.route.replacement": "my-$2"
now it creating topic in kafka as follow
my-inventory
When I specified topic= my-inventory
in jdbc-sink, it giving me the following exception [io.confluent.connect.jdbc.sink.DbStructure]
connect_1 | 2019-01-29 10:34:32,218 INFO || Unable to find fields [SinkRecordField{schema=Schema{STRING}, name='email', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='first_name', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='last_name', isPrimaryKey=false}] among column names [street, customer_id, city, state, id, type, zip] [io.confluent.connect.jdbc.sink.DbStructure]
connect_1 | 2019-01-29 10:34:32,220 ERROR || WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect_1 | org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add missing field SinkRecordField{schema=Schema{STRING}, name='email', isPrimaryKey=false}, as it is not optional and does not have a default value
connect_1 | at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:133)
Note: In Db it create table named 'my-inventory'
JDBC sink expects one table per topic, with one single schema (column names x types) per topic as well.
Your regex routing on Debezium/source side is effectively dumping any table (could include some system ones, albeit I don't recall that being a default value in the config) in the inventory
database to the "my-inventory" topic.
Therefor, as soon as you'd have more than one table captured in that topic you might run into troubles...
这篇关于从MySQL到使用JDBC Sink的Postgres的Debezium-更改transforms.route.replacement会产生SinkRecordField错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!