Kafka Connect JDBC接收器报价.SQL.IDENTIFIERS不工作 [英] Kafka Connect JDBC Sink quote.sql.identifiers not working

查看:30
本文介绍了Kafka Connect JDBC接收器报价.SQL.IDENTIFIERS不工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Kafka Connect使用JDBC源和宿连接器将数据从旧的DB2数据库同步到Postgres数据库。它工作得很好,但前提是我必须严格控制用于表名的大小写。

例如,我在DB2中有一个名为action的表,它也存在于postgres中,具有相同的列,等等。唯一的区别是在DB2中它是大写的ACTION,而在postgres中它是小写的action

以下是一个有效的接收器文件:

{
    "name": "jdbc_sink_pg_action",
    "config": {
        "_comment": "The JDBC connector class",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

        "_comment": "How to serialise the value of keys ",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",


        "_comment": "As above, but for the value of the message. Note that these key/value serialisation settings can be set globally for Connect and thus omitted for individual connector configs to make them shorter and clearer",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",

        "_comment": " --- JDBC-specific configuration below here  --- ",

        "_comment": "JDBC connection URL.",
        "connection.url": "jdbc:postgresql://localhost:5435/postgres",
        "connection.user": "postgres",
        "connection.password": "*****",

        "topics": "ACTION",
        "table.name.format": "action",

        "_comment": "The insertion mode to use",
        "insert.mode": "upsert",

        "_comment": "The primary key mode",
        "pk.mode": "record_value",

        "_comment": "List of comma-separated primary key field names. The runtime interpretation of this config depends on the pk.mode",
        "pk.fields": "ACTION_ID",

        "quote.sql.identifiers": "never"
    }
}

这还可以,但不太灵活。例如,我还有许多其他表,我也想同步它们,但我不想为每个表创建一个连接器文件。因此,我尝试使用:

"table.name.format": "${topic}",

执行此操作时,当我尝试加载接收器连接器时,日志中出现以下错误:

起因:org.apache.kafka.connect.errors.ConnectException:表"操作" 丢失并且自动创建被禁用

所以在我看来"quote.sql.identifiers": "never"实际上不起作用,否则接收器连接器正在执行的查询将不带引号,并且它将允许任何情况(它将转换为更低)。

为什么这不起作用?如果我只使用ACTION作为able.name.Format,则得到相同的结果。

推荐答案

您的postgresql表名(action)与主题名(ACTION)不相同。 Kafka Connect JDBC ConnectorusesgetTables()方法检查表是否存在,其中tableNamePatternparam区分大小写(根据文档:must match the table name as it is stored in the database)。

您可以使用ChangeTopicCase中的Kafka Connect Common Transformations转换。

这篇关于Kafka Connect JDBC接收器报价.SQL.IDENTIFIERS不工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆