如何使用Kafka Connect将MS Sql Server表导入KSQL [英] How to import MS Sql Server tables to KSQL with Kafka connect
问题描述
我正在尝试将远程SQL Server上存在的所有表导入KSQL主题
,这是我的文件属性
Hi I am trying to import all tables present on remote SQL Server to KSQL topics this is my file properties
connector.class=io.confluent.connect.cdc.mssql.MsSqlSourceConnector
name=sqlservertest
tasks.max=1
initial.database=$$DATABASE
connection.url=jdbc:sqlserver://$$IP:1433;databaseName=$$DATABASE;user=$$USER;
username=$$USER
password=$$PASS
server.name=$$IP
server.port=1433
topic.prefix=sqlservertest
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
mode=bulk
auto.create=true
auto.evolve=true
比我做
confluent load sqlservertest -d /opt/kakfkaconf/sqlservertest.properties
并在日志中
confluent log connect -f
它显示
[2018-10-10 14:18:43,856] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:868)
它运行正确,但是没有导入任何内容,主题仍然为空
it run correctly but it doesn't import anything, the topic remain empty
confluent status sqlservertest
{
"name": "sqlservertest",
"connector": {
"state": "RUNNING",
"worker_id": "10.132.0.2:8083"
},
"tasks": [],
"type": "source"
}
我也更改了物业
name=mssql
connector.class=io.confluent.connect.cdc.mssql.MsSqlSourceConnector
tasks.max=2
initial.database=$$DB
username=$$USER
password=$$PASS
server.name=$$IP
server.port=1433
change.tracking.tables=$$SCHEMA.$$TABLE
auto.create=true
auto.evolve=true
topic.prefix=$$DB
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
但我遇到此错误
[2018-10-10 15:06:09,216] ERROR Exception thrown while querying for ChangeKey{databaseName=$$DB, schemaName=$$SCHEMA, tableName=$$TABLE} (io.confluent.connect.cdc.mssql.QueryService:94)
org.apache.kafka.connect.errors.DataException: Exception thrown while getting metadata for ChangeKey{databaseName=$$DB, schemaName=$$SCHEMA, tableName=$$TABLE}
at io.confluent.connect.cdc.CachingTableMetadataProvider.tableMetadata(CachingTableMetadataProvider.java:64)
at io.confluent.connect.cdc.mssql.QueryService.queryTable(QueryService.java:108)
at io.confluent.connect.cdc.mssql.QueryService.processTables(QueryService.java:92)
at io.confluent.connect.cdc.mssql.QueryService.run(QueryService.java:67)
at com.google.common.util.concurrent.AbstractExecutionThreadService$1$2.run(AbstractExecutionThreadService.java:60)
at com.google.common.util.concurrent.Callables$3.run(Callables.java:95)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near '='.
at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
... 6 more
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near '='.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:259)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1547)
... 11 more
推荐答案
我发现了此错误,Kafka连接器正在使用MS Sql Server 2012中仅存在的功能,特别是功能中的IFF和布尔值比较
I found the real cause of this error, Kafka connector are using the functions that is present just in MS Sql server 2012, in particular IFF and boolan comparison in function
select IFF(1>2,'OK','KO');
select (1>2) as bool;
在MS Sql 2008上不起作用的
that are NOT working on MS Sql 2008
真正的原因是Conflunet MSSQL连接器仅适用于MS SQL Server 2012及更高版本,而我正在运行版本2008
我反编译了库 kafka-connect-cdc-mssql ,并调整了sql代码使其与sqlserver 2008兼容,现在它可以正常工作。
I decompiled the library kafka-connect-cdc-mssql and adjusted the sql code to be compliant with sqlserver 2008 and now it's working.
也许我会将其推送到github以供所有人使用
Maybe I will push it to github to make it available for everybody
这篇关于如何使用Kafka Connect将MS Sql Server表导入KSQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!