sqlite3的Kafka JDBC源连接器时间戳模式失败 [英] Kafka JDBC source connector time stamp mode failing for sqlite3
问题描述
我试图在sqlite中建立一个带有两个表的数据库。我的表中曾经有一个timestamp列。我正在尝试实现时间戳模式以捕获数据库中的增量更改。 Kafka connect失败,并出现以下错误:
I tried to set up a database with two tables in sqlite. Once of my table is having a timestamp column . I am trying to implement timestamp mode to capture incremental changes in the DB. Kafka connect is failing with the below error:
ERROR Failed to get current time from DB using Sqlite and query 'SELECT
CURRENT_TIMESTAMP'
(io.confluent.connect.jdbc.dialect.SqliteDatabaseDialect:471)
java.sql.SQLException: Error parsing time stamp
Caused by: java.text.ParseException: Unparseable date: "2019-02-05 02:05:29"
does not match (\p{Nd}++)\Q-\E(\p{Nd}++)\Q-\E(\p{Nd}++)\Q
\E(\p{Nd}++)\Q:\E(\p{Nd}++)\Q:\E(\p{Nd}++)\Q.\E(\p{Nd}++)
非常感谢您的帮助
配置:
name=test-query-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlite:employee.db
query=SELECT users.id, users.name, transactions.timestamp, transactions.payment_type FROM users JOIN transactions ON (users.id = transactions.user_id)
mode=timestamp
timestamp.column.name=timestamp
topic.prefix=test-joined
DDL:
CREATE TABLE transactions(id integer primary key not null,
payment_type text not null,
timestamp DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
user_id int not null,
constraint fk foreign key(user_id) references users(id)
);
CREATE TABLE users (id integer primary key not null,name text not null);
推荐答案
kafka connect jdbc连接器可轻松检测到
The kafka connect jdbc connector easily detects the changes in the timestamp, if the values of the 'timestamp' column are in the format of the 'UNIX timestamp'.
sqlite> CREATE TABLE transact(timestamp TIMESTAMP DEFAULT (STRFTIME('%s', 'now')) not null,
...> id integer primary key not null,
...> payment_type text not null);
sqlite>
值可以插入为:
sqlite> INSERT INTO transact(timestamp,payment_type,id) VALUES (STRFTIME('%s', 'now'),'cash',1);
然后,与时间戳相关的更改由kafka jdbc源连接器检测到,可以按如下方式使用:
The timestamp related changes are then detected by the kafka jdbc source connector and the same can be consumed as follows:
kafka-console-consumer --bootstrap-server localhost:9092 --topic jdbc-transact --from-beginning
{"timestamp":1562321516,"id":2,"payment_type":"card"}
{"timestamp":1562321790,"id":1,"payment_type":"online"}
这篇关于sqlite3的Kafka JDBC源连接器时间戳模式失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!