如何使用Debezium从MS SQL将250张表导入Kafka [英] How to ingest 250 tables into Kafka from MS SQL with Debezium

查看:52
本文介绍了如何使用Debezium从MS SQL将250张表导入Kafka的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试在PostgreSQL作为源与SQL Server作为目标之间建立Kafka连接管道.我使用了3个Kafka代理,并且需要使用252个主题(一个主题与一个PostgreSQL表相同).运行一个多小时后,它只能从252个表中拉出218个.我发现的错误是SQL Server中存在死锁机制,该机制可以将事务保存到SQL Server并尝试重试它,而且Debezium复制插槽已经存在.

Hi i have try to build Kafka connect pipeline between PostgreSQL as source to SQL Server as the destination. I used 3 Kafka brokers, and need to consume 252 topics (one topics same as one PostgreSQL table). After run for more than an hour, it only can pull 218 out of 252 tables. The error that i found is there's deadlock mechanism in SQL Server which can hold transaction to SQL Server and try to retry it, also Debezium replication slot has been there.

我在接收器上使用了3个max worker的分布式连接器,但似乎还不够.还可以尝试将offset.time_out.ms设置为60000,将偏移分区设置为更高(100).恐怕这不是我想要的生产水平.任何人都可以对此案提出建议吗?是否有任何计算方法可以确定我所需的最佳工人数量?

I use distributed connectors with 3 max worker on sink, but maybe it seems not enough. Also try with higher offset.time_out.ms to 60000 and higher offset partition (100). I'm afraid that this is not an production level that i want. Anyone can give suggestion about this case? Is there any calculation to decide best number of workers that i need?

更新

这里我得到一些错误.我看到一些连接器被杀死了.有人告诉我死锁发生在SQL SERVER中:

here some error i get. I see some connectors are killed. One tell me that deadlock happen in SQL SERVER :

[2020-03-26 15:06:28,494] ERROR WorkerSinkTask{id=sql_server_sink_XXA-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:552)
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

更新2020年4月14日

我仍然对此有疑问,我忘了告诉我如何部署连接器.现在我使用2个工人,一个用于源,另一个用于水槽.我将所有的表和pk列在一个csv中,并在各行中循环以创建连接器,而无需休眠或等待每分钟.我还对每个主题使用单个主题分区和3个副本.但是我仍然有sql server连接死锁

I still have problem with this, i forgot to tell about how i deploy the connectors. Now i use 2 workers, one for source and one for sink. I list all of my tables and pk in an csv and loop through rows to create the connectors without sleep or wait for every minutes. I also use single topics partition and 3 replica for each topics. But i still have sql server connection deadlock

推荐答案

问题可能是同时访问具有多个任务的同一个SQL表,并导致了诸如死锁之类的同步问题.
由于您已经有很多主题,而且您的连接器可以并行访问它们,因此建议您将每个主题的分区数减少到仅1 (在分区中不支持减少分区数)Kafka,因此您应该删除并重新创建具有新分区数的每个主题.
这样,每个主题只有一个分区.每个分区只能在单个线程(/task/consumer)中访问,因此没有机会将并行SQL事务传输到同一表.

The problem may be accessing the same SQL table with multiple tasks in the same time and causing synchronization problems like deadlocks as you mentioned.
Since you already have a large number of topics, and your connector can access them in parallel, I would suggest you to reduce the number partitions for every topic to just 1 (reduce number of partitions is not supported in Kafka so you should delete and recreate every topic with the new number of partitions).
This way, every topic have only one partition; every partition can be accessed only in a single thread(/task/consumer) so there is no chance for parallel SQL transactions to the same table.

或者,更好的方法是创建具有3个分区的单个主题(与您拥有的任务/消费者的数量相同),并使生产者使用SQL表名称作为消息密钥.
Kafka保证具有相同密钥的消息始终进入同一分区,因此具有相同表的所有消息都将驻留在单个分区上(消耗单个线程).

Alternatively, a better approach is to create a single topic with 3 partitions (same as the number of tasks/consumers you have) and make the producer use the SQL table name as the message key.
Kafka guarantees messages with the same key to always go to the same partition, so all the messages with the same table will reside on a single partition (single thread consuming).

如果您觉得有用,我可以附上有关如何创建Kafka Producer和发送键控消息的更多信息.

If you find it useful, I can attach more information about how to create Kafka Producer and send keyed messages.

这篇关于如何使用Debezium从MS SQL将250张表导入Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆