如何在 Kafka Connect JDBC Source 连接器中添加显式 WHERE 子句 [英] How to add explicit WHERE clause in Kafka Connect JDBC Source connector

查看:20
本文介绍了如何在 Kafka Connect JDBC Source 连接器中添加显式 WHERE 子句的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 kafka 连接从 DB2 到 kafka 主题的源数据,我正在配置 sql 查询以从 DB2 读取数据,下面是查询

I am using kafka connect to source data from DB2 to kafka topic and i am configuring sql query to read the data from DB2 , below is query

SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL'

am using setting "timestamp.column.name": "CREATE_TS" 这里的问题是在查询中他们已经是 WHERE 子句,而 kafka connect 试图添加另一个带有时间戳列的 where 子句,它正在创建问题,还有一个问题是,如果我从 sql 子句中删除 where 子句,如下所示

am using setting "timestamp.column.name": "CREATE_TS" here problem is in the query their is already WHERE clause , and kafka connect tried to add another where clause with timestamp column and it is creating issue and one more issue is if i remove where clause from sql clause like below

SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR

然后出现 substr 错误,如下所示

then am getting error with substr , like below

SQL Error [22011]: THE SECOND OR THIRD ARGUMENT OF THE SUBSTR OR SUBSTRING FUNCTION IS OUT OF RANGE. SQLCODE=-138, SQLSTATE=22011, DRIVER=4.19.26

任何人都可以就这两个问题提出建议,我卡在这一点上.

can anyone suggest on both is this issues , am stuck at this point .

推荐答案

发生这种情况是因为您试图同时使用 "mode": "timestamp"query.TimestampIncrementingTableQuerierWHERE 子句附加到与 query 中现有的 WHERE 子句冲突的查询中.

This happens because you are trying to use both "mode": "timestamp" and query. TimestampIncrementingTableQuerier appends a WHERE clause to the query that conflicts with existing WHERE clauses in the query.

JDBC 源连接器文档 对此很清楚:

查询

如果指定,则为选择新行或更新行而执行的查询.用这个设置如果你想连接表,选择列的子集表或过滤数据.如果使用,此连接器将仅复制数据使用这个查询——全表复制将被禁用.不同的查询模式仍可用于增量更新,但为了正确构造增量查询,必须可以将 WHERE 子句附加到此查询(即不能有 WHERE 子句用过的).如果使用 WHERE 子句,它必须处理增量查询本身.

If specified, the query to perform to select new or updated rows. Use this setting if you want to join tables, select subsets of columns in a table, or filter data. If used, this connector will only copy data using this query -- whole-table copying will be disabled. Different query modes may still be used for incremental updates, but in order to properly construct the incremental query, it must be possible to append a WHERE clause to this query (i.e. no WHERE clauses may be used). If you use a WHERE clause, it must handle incremental queries itself.


作为一种解决方法,您可以将查询修改为(取决于您使用的 SQL 风格)


As a workaround, you can modify your query to (depending on what SQL flavour are you using)

SELECT * FROM ( SELECT * FROM table WHERE ...)

WITH a AS
   SELECT * FROM b
    WHERE ...
SELECT * FROM a

例如,在您的情况下,查询应该是

For example, in your case the query should be

"query":"SELECT * FROM (SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL') o"

这篇关于如何在 Kafka Connect JDBC Source 连接器中添加显式 WHERE 子句的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆