如何在Kafka Connect JDBC Source连接器中添加显式WHERE子句 [英] How to add explicit WHERE clause in Kafka Connect JDBC Source connector
问题描述
我正在使用kafka connect从DB2到kafka主题连接源数据,并且我正在配置sql查询以从DB2读取数据,下面是查询
I am using kafka connect to source data from DB2 to kafka topic and i am configuring sql query to read the data from DB2 , below is query
SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL'
am使用设置"timestamp.column.name": "CREATE_TS"
在这里问题是在查询中他们已经是WHERE
子句,而kafka connect尝试添加另一个带有timestamp列的where子句,这正在创建问题,另一个问题是如果我删除where子句来自sql子句,如下所示
am using setting "timestamp.column.name": "CREATE_TS"
here problem is in the query their is already WHERE
clause , and kafka connect tried to add another where clause with timestamp column and it is creating issue and one more issue is if i remove where clause from sql clause like below
SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR
然后使用substr出错,如下所示
then am getting error with substr , like below
SQL Error [22011]: THE SECOND OR THIRD ARGUMENT OF THE SUBSTR OR SUBSTRING FUNCTION IS OUT OF RANGE. SQLCODE=-138, SQLSTATE=22011, DRIVER=4.19.26
谁能在这两个问题上都提出建议,就此困住了.
can anyone suggest on both is this issues , am stuck at this point .
推荐答案
之所以会发生这种情况,是因为您尝试同时使用"mode": "timestamp"
和query
. TimestampIncrementingTableQuerier
将WHERE
子句追加到查询中,该子句与query
中的现有WHERE
子句冲突.
This happens because you are trying to use both "mode": "timestamp"
and query
. TimestampIncrementingTableQuerier
appends a WHERE
clause to the query that conflicts with existing WHERE
clauses in the query
.
JDBC源连接器文档对此很清楚:
query
如果指定,则执行查询以选择新的或更新的行.使用 此设置(如果要联接表),请选择 表格或过滤器数据.如果使用,此连接器将仅复制数据 使用此查询-将禁用整个表的复制.不同的 查询模式可能仍用于增量更新,但为了 正确构造增量查询,必须有可能 将WHERE子句添加到此查询(即,不得将WHERE子句 用过的). 如果您使用WHERE子句,则它必须处理增量查询 本身.
If specified, the query to perform to select new or updated rows. Use this setting if you want to join tables, select subsets of columns in a table, or filter data. If used, this connector will only copy data using this query -- whole-table copying will be disabled. Different query modes may still be used for incremental updates, but in order to properly construct the incremental query, it must be possible to append a WHERE clause to this query (i.e. no WHERE clauses may be used). If you use a WHERE clause, it must handle incremental queries itself.
作为一种解决方法,您可以将查询修改为(取决于您使用的是哪种SQL风格)
As a workaround, you can modify your query to (depending on what SQL flavour are you using)
SELECT * FROM ( SELECT * FROM table WHERE ...)
或
WITH a AS
SELECT * FROM b
WHERE ...
SELECT * FROM a
例如,在您的情况下,查询应为
For example, in your case the query should be
"query":"SELECT * FROM (SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL') o"
这篇关于如何在Kafka Connect JDBC Source连接器中添加显式WHERE子句的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!