如何使用Kafka Connect将MS Sql Server表导入KSQL [英] How to import MS Sql Server tables to KSQL with Kafka connect

查看:413
本文介绍了如何使用Kafka Connect将MS Sql Server表导入KSQL的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将远程SQL Server上存在的所有表导入KSQL主题
,这是我的文件属性

Hi I am trying to import all tables present on remote SQL Server to KSQL topics this is my file properties

connector.class=io.confluent.connect.cdc.mssql.MsSqlSourceConnector
name=sqlservertest
tasks.max=1
initial.database=$$DATABASE
connection.url=jdbc:sqlserver://$$IP:1433;databaseName=$$DATABASE;user=$$USER;
username=$$USER
password=$$PASS
server.name=$$IP
server.port=1433
topic.prefix=sqlservertest
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
mode=bulk
auto.create=true
auto.evolve=true

比我做

confluent load sqlservertest -d /opt/kakfkaconf/sqlservertest.properties

并在日志中

confluent log connect -f

它显示

[2018-10-10 14:18:43,856] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:868)

它运行正确,但是没有导入任何内容,主题仍然为空

it run correctly but it doesn't import anything, the topic remain empty

confluent status sqlservertest 
{
  "name": "sqlservertest",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.132.0.2:8083"
  },
  "tasks": [],
  "type": "source"
}

我也更改了物业

name=mssql
connector.class=io.confluent.connect.cdc.mssql.MsSqlSourceConnector
tasks.max=2
initial.database=$$DB
username=$$USER
password=$$PASS
server.name=$$IP
server.port=1433
change.tracking.tables=$$SCHEMA.$$TABLE
auto.create=true
auto.evolve=true
topic.prefix=$$DB
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

但我遇到此错误

[2018-10-10 15:06:09,216] ERROR Exception thrown while querying for ChangeKey{databaseName=$$DB, schemaName=$$SCHEMA, tableName=$$TABLE} (io.confluent.connect.cdc.mssql.QueryService:94)
org.apache.kafka.connect.errors.DataException: Exception thrown while getting metadata for ChangeKey{databaseName=$$DB, schemaName=$$SCHEMA, tableName=$$TABLE}
        at io.confluent.connect.cdc.CachingTableMetadataProvider.tableMetadata(CachingTableMetadataProvider.java:64)
        at io.confluent.connect.cdc.mssql.QueryService.queryTable(QueryService.java:108)
        at io.confluent.connect.cdc.mssql.QueryService.processTables(QueryService.java:92)
        at io.confluent.connect.cdc.mssql.QueryService.run(QueryService.java:67)
        at com.google.common.util.concurrent.AbstractExecutionThreadService$1$2.run(AbstractExecutionThreadService.java:60)
        at com.google.common.util.concurrent.Callables$3.run(Callables.java:95)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near '='.
        at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
        ... 6 more
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near '='.
        at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:259)
        at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1547)
        ... 11 more


推荐答案

我发现了此错误,Kafka连接器正在使用MS Sql Server 2012中仅存在的功能,特别是功能中的IFF和布尔值比较

I found the real cause of this error, Kafka connector are using the functions that is present just in MS Sql server 2012, in particular IFF and boolan comparison in function

select IFF(1>2,'OK','KO');
select (1>2) as bool;

在MS Sql 2008上不起作用的

that are NOT working on MS Sql 2008

真正的原因是Conflunet MSSQL连接器仅适用于MS SQL Server 2012及更高版本,而我正在运行版本2008

我反编译了库 kafka-connect-cdc-mssql ,并调整了sql代码使其与sqlserver 2008兼容,现在它可以正常工作。

I decompiled the library kafka-connect-cdc-mssql and adjusted the sql code to be compliant with sqlserver 2008 and now it's working.

也许我会将其推送到github以供所有人使用

Maybe I will push it to github to make it available for everybody

这篇关于如何使用Kafka Connect将MS Sql Server表导入KSQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆