从MySQL到使用JDBC Sink的Postgres的Debezium-更改transforms.route.replacement会产生SinkRecordField错误 [英] Debezium from MySQL to Postgres with JDBC Sink - change of transforms.route.replacement gives a SinkRecordField error

查看:1600
本文介绍了从MySQL到使用JDBC Sink的Postgres的Debezium-更改transforms.route.replacement会产生SinkRecordField错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 debezium-examples

source.json

{
"name": "inventory-connector",
"config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"
}
}

jdbc-sink.json

{
"name": "jdbc-sink",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "customers",
    "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_value"
}
}

可以正常工作.但是当我在以下情况下进行了一些更改时,进行了讨论.它给了我'SinkRecordField'错误.

场景

我已从源代码更改了此属性

    "transforms.route.replacement": "my-$2"

现在它在kafka中创建主题,如下所示

my-inventory

当我在jdbc-sink中指定topic = my-inventory时,它为我提供了以下例外 [io.confluent.connect.jdbc.sink.DbStructure]

connect_1    | 2019-01-29 10:34:32,218 INFO   ||  Unable to find fields [SinkRecordField{schema=Schema{STRING}, name='email', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='first_name', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='last_name', isPrimaryKey=false}] among column names [street, customer_id, city, state, id, type, zip]   [io.confluent.connect.jdbc.sink.DbStructure]
connect_1    | 2019-01-29 10:34:32,220 ERROR  ||  WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted.   [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect_1    | org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add missing field SinkRecordField{schema=Schema{STRING}, name='email', isPrimaryKey=false}, as it is not optional and does not have a default value
connect_1    |  at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:133)

注意::在Db中,创建名为"my-inventory"的表

解决方案

JDBC接收器希望每个主题一个表,每个主题也具有一个单一的架构(列名x类型).

您在Debezium/源端进行的正则表达式路由实际上是在inventory中转储任何表(可能包括一些系统表,尽管我不记得它是配置中的默认值)数据库中找到我的库存"主题.

因此,一旦在该主题中捕获了多个表,您可能会遇到麻烦...

I am using this debezium-examples

source.json

{
"name": "inventory-connector",
"config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"
}
}

jdbc-sink.json

{
"name": "jdbc-sink",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "customers",
    "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_value"
}
}

its working fine.But when I have made some changes as discuss in the following scenario. it giving me 'SinkRecordField' error.

Scenario

I have changed this properties from source

    "transforms.route.replacement": "my-$2"

now it creating topic in kafka as follow

my-inventory

When I specified topic= my-inventory in jdbc-sink, it giving me the following exception [io.confluent.connect.jdbc.sink.DbStructure]

connect_1    | 2019-01-29 10:34:32,218 INFO   ||  Unable to find fields [SinkRecordField{schema=Schema{STRING}, name='email', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='first_name', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='last_name', isPrimaryKey=false}] among column names [street, customer_id, city, state, id, type, zip]   [io.confluent.connect.jdbc.sink.DbStructure]
connect_1    | 2019-01-29 10:34:32,220 ERROR  ||  WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted.   [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect_1    | org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add missing field SinkRecordField{schema=Schema{STRING}, name='email', isPrimaryKey=false}, as it is not optional and does not have a default value
connect_1    |  at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:133)

Note: In Db it create table named 'my-inventory'

解决方案

JDBC sink expects one table per topic, with one single schema (column names x types) per topic as well.

Your regex routing on Debezium/source side is effectively dumping any table (could include some system ones, albeit I don't recall that being a default value in the config) in the inventory database to the "my-inventory" topic.

Therefor, as soon as you'd have more than one table captured in that topic you might run into troubles...

这篇关于从MySQL到使用JDBC Sink的Postgres的Debezium-更改transforms.route.replacement会产生SinkRecordField错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆