Kafka connect可以在批量模式下使用自定义查询吗? [英] Kafka connect possible to use custom query with bulk mode?

查看:89
本文介绍了Kafka connect可以在批量模式下使用自定义查询吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正尝试每7天发送一次记录.这是我正在处理的配置,但它 即使查询在数据库服务器上生成记录也无法正常工作.

I'm trying to send record for every row 7 days old. This is the configuration I was working on but it doesn't work even though the query produces records on the DB server.

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": 1,
    "mode": "bulk",
    "connection.url": "jdbc:mysql://mysql:3300/test_db?user=root&password=password",
    "query": "SELECT * FROM test_table WHERE DATEDIFF(CURDATE(), test_table.modified) = 7;",
    "topic.prefix": "test-jdbc-",
    "poll.interval.ms": 10000
}

推荐答案

JDBC源连接器通过使用JDBC驱动程序将数据从关系数据库导入Apache Kafka主题. 数据正在定期加载,要么基于时间戳递增,要么批量加载.最初,尽管在创建JDBC连接器时处于模式增量或批量状态,但在时间戳列上仅加载新行或修改行之后,它将所有数据加载到主题中.

The JDBC source connector import data from relational database into Apache Kafka topic by using JDBC driver. Data is loading periodically either increment based on timestamp or bulk load. Initially despite mode increment or bulk when you create JDBC connector it load all data into topic after it will load only new or modified rows on time stamp column.

批量:此模式未经过滤,因此根本不会增量.它将在每次迭代时从表中加载所有行.如果您要定期转储整个表,最终删除条目,下游系统可以安全地处理重复项,这将很有用. 因此,您无法使用批量模式连续加载过去7天

Bulk: This mode is unfiltered and therefore not incremental at all. It will load all rows from a table on each iteration. This can be useful if you want to periodically dump an entire table where entries are eventually deleted and the downstream system can safely handle duplicates. So means you can't load last 7 days incrementally using bulk mode

时间戳列:在此模式下,包含修改时间戳的单个列用于跟踪上次处理数据的时间,并仅查询自该时间以来已被修改的行.您可以在此处加载增量数据.但是,第一次创建时它将如何工作,它将加载数据库表中所有可用的数据,因为对于JDBC连接器,这些都是新数据.稍后,它将仅加载新的或修改的数据.

Timestamp Column: In this mode, a single column containing a modification timestamp is used to track the last time data was processed and to query only for rows that have been modified since that time. Here you can able to load incremental data. But how it work when you create first time it will load all data available in database table because for JDBC connector these are new data. Later it will only load new or modified data.

现在按照您的要求,您似乎正在尝试按某个时间间隔加载所有数据,这些时间间隔将配置为"poll.interval.ms":10000.我看到您的JDBC连接设置符合定义,但查询可能无法正常运行使用查询如下.似乎JDBC连接器将查询包装为表格,如果添加大小写则无法正常工作.

Now as per your requirement seems your are trying load all data at some time interval which is going to configured "poll.interval.ms": 10000. I see your JDBC connect setting is as per definition whereas query may not work try using query as below. Seems JDBC connector wrap query as table which is not working if you add where case.

"query": "select * from (select * from test_table where  modified > now() - interval '7' day) o",

尝试以下设置

{
  "name": "application_name",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "connection.url": "jdbc:mysql://mysql:3300/test_db",
  "connection.user": "root",
  "connection.password": "password",
  "connection.attempts": "1",
  "mode": "bulk",
  "validate.non.null": false,
  "query": "select * from (select * from test_table where  modified > now() - interval '7' day) o",
  "table.types": "TABLE",
  "topic.prefix": "test-jdbc-",
 "poll.interval.ms": 10000
  "schema.ignore": true,
  "key.converter.schemas.enable": "false",
  "value.converter.schemas.enable": "false"

}

这篇关于Kafka connect可以在批量模式下使用自定义查询吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆