使用 kafka-connect 从多个主题更新到多个表 [英] Upserting into multiple tables from multiples topics using kafka-connect

查看:48
本文介绍了使用 kafka-connect 从多个主题更新到多个表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 JDBC 接收器连接器读取 2 个 kafka 主题并将其更新插入到我手动创建的 2 个 Oracle 表中.每个表都有 1 个主键,我想在 upsert 模式下使用它.如果我仅用于 1 个主题且 pk.fields 中只有 1 个字段,则连接器工作正常,但如果我在 pk.fields 中输入多个列,则每个表中的一个列无法识别架构.我是否遗漏了任何东西,请提出建议.

I am trying to read 2 kafka topics using JDBC sink connector and upsert into 2 Oracle tables which I manually created it. Each table has 1 primary key I want to use it in upsert mode. Connector works fine if I use only for 1 topic and only 1 field in pk.fields but if I enter multiple columns in pk.fields one from each table it fails to recognize the schema. Am I missing any thing please suggest.

name=oracle_sink_prod
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=KAFKA1011,JAFKA1011
connection.url=URL
connection.user=UID
connection.password=PASSWD
auto.create=false
table.name.format=KAFKA1011,JAFKA1011
pk.mode=record_value
pk.fields= ID,COMPANY 
auto.evolve=true
insert.mode=upsert

//ID is pk of kafka1011 table and COMPANY is of other

推荐答案

如果 PK 不同,只需创建两个不同的 sink 连接器.它们都可以在同一个 Kafka Connect 工作器上运行.

If the PK are different, just create two different sink connectors. They can both run on the same Kafka Connect worker.

您还可以选择使用 Kafka 消息本身的密钥.有关详细信息,请参阅 doc.这是更具可扩展性的选项,然后您只需要确保您的消息被正确键入以使其向下流到 JDBC Sink.

You also have the option of using the key of the Kafka message itself. See doc for more info. This is the more scalable option, and you would then just need to ensure that your messages were keyed correctly for this to flow down to the JDBC Sink.

这篇关于使用 kafka-connect 从多个主题更新到多个表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆