如何在Kafka Connect JDBC Source连接器中添加显式WHERE子句 [英] How to add explicit WHERE clause in Kafka Connect JDBC Source connector

查看:684
本文介绍了如何在Kafka Connect JDBC Source连接器中添加显式WHERE子句的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用kafka connect从DB2到kafka主题连接源数据,并且我正在配置sql查询以从DB2读取数据,下面是查询

I am using kafka connect to source data from DB2 to kafka topic and i am configuring sql query to read the data from DB2 , below is query

SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL'

am使用设置"timestamp.column.name": "CREATE_TS"在这里问题是在查询中他们已经是WHERE子句,而kafka connect尝试添加另一个带有timestamp列的where子句,这正在创建问题,另一个问题是如果我删除where子句来自sql子句,如下所示

am using setting "timestamp.column.name": "CREATE_TS" here problem is in the query their is already WHERE clause , and kafka connect tried to add another where clause with timestamp column and it is creating issue and one more issue is if i remove where clause from sql clause like below

SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR

然后使用substr出错,如下所示

then am getting error with substr , like below

SQL Error [22011]: THE SECOND OR THIRD ARGUMENT OF THE SUBSTR OR SUBSTRING FUNCTION IS OUT OF RANGE. SQLCODE=-138, SQLSTATE=22011, DRIVER=4.19.26

谁能在这两个问题上都提出建议,就此困住了.

can anyone suggest on both is this issues , am stuck at this point .

推荐答案

之所以会发生这种情况,是因为您尝试同时使用"mode": "timestamp"query. TimestampIncrementingTableQuerierWHERE子句追加到查询中,该子句与query中的现有WHERE子句冲突.

This happens because you are trying to use both "mode": "timestamp" and query. TimestampIncrementingTableQuerier appends a WHERE clause to the query that conflicts with existing WHERE clauses in the query.

JDBC源连接器文档对此很清楚:

query

如果指定,则执行查询以选择新的或更新的行.使用 此设置(如果要联接表),请选择 表格或过滤器数据.如果使用,此连接器将仅复制数据 使用此查询-将禁用整个表的复制.不同的 查询模式可能仍用于增量更新,但为了 正确构造增量查询,必须有可能 将WHERE子句添加到此查询(即,不得将WHERE子句 用过的). 如果您使用WHERE子句,则它必须处理增量查询 本身.

If specified, the query to perform to select new or updated rows. Use this setting if you want to join tables, select subsets of columns in a table, or filter data. If used, this connector will only copy data using this query -- whole-table copying will be disabled. Different query modes may still be used for incremental updates, but in order to properly construct the incremental query, it must be possible to append a WHERE clause to this query (i.e. no WHERE clauses may be used). If you use a WHERE clause, it must handle incremental queries itself.

作为一种解决方法,您可以将查询修改为(取决于您使用的是哪种SQL风格)

As a workaround, you can modify your query to (depending on what SQL flavour are you using)

SELECT * FROM ( SELECT * FROM table WHERE ...)

WITH a AS
   SELECT * FROM b
    WHERE ...
SELECT * FROM a

例如,在您的情况下,查询应为

For example, in your case the query should be

"query":"SELECT * FROM (SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL') o"

这篇关于如何在Kafka Connect JDBC Source连接器中添加显式WHERE子句的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆